Browse Source

remove all unused files and benchmark methods, adapt the MAX-Benchmark to use the cacher, remove manually-set numa configuration and replace it with dynamically adapting to the configured affinity, add two more template-options to the worker that control whether a is cached as well and whether scanb waits on the caching

master
Constantin Fürst 11 months ago
parent
commit
6cc49daf89
  1. 26
      qdp_project/CMakeLists.txt
  2. 10
      qdp_project/bench_all_dimes.sh
  3. 7
      qdp_project/bench_max.sh
  4. 33
      qdp_project/cmake_all_dimes.sh
  5. 240
      qdp_project/src/benchmark/DIMES_benchmark.cpp
  6. 260
      qdp_project/src/benchmark/DIMES_cores_benchmark.cpp
  7. 81
      qdp_project/src/benchmark/MAX_benchmark.cpp
  8. 147
      qdp_project/src/benchmark/QDP_minimal.h
  9. 149
      qdp_project/src/benchmark/doubly_filtered_agg.cpp
  10. 184
      qdp_project/src/benchmark/filter_aggregate_pipeline.cpp
  11. 188
      qdp_project/src/benchmark/latency.cpp
  12. 271
      qdp_project/src/benchmark/micro_benchmarks.cpp
  13. 391
      qdp_project/src/benchmark/pipelines/DIMES_scan_filter_pipe.h
  14. 199
      qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h
  15. 387
      qdp_project/src/benchmark/pipelines/scan_filter_pipe.h
  16. 41
      qdp_project/src/utils/execution_modes.h

26
qdp_project/CMakeLists.txt

@ -20,12 +20,6 @@ set(SUPPRESS_WARNINGS "-Wno-literal-suffix -Wno-volatile")
set(DEBUG_FLAGS "-g3" "-ggdb") set(DEBUG_FLAGS "-g3" "-ggdb")
set(RELEASE_FLAGS "-O3") set(RELEASE_FLAGS "-O3")
#set pcm location
set(PCM_LOCATION ./thirdParty/pcm)
set(PCM_LINKS -lpcm -L${CMAKE_CURRENT_LIST_DIR}/${PCM_LOCATION}/build/lib)
# pass the in formation about the shared library location to the linker
link_directories(${CMAKE_CURRENT_LIST_DIR}/${PCM_LOCATION}/build/lib)
#set flags used for Release and Debug build type #set flags used for Release and Debug build type
add_compile_options( add_compile_options(
"$<$<CONFIG:Release>:${RELEASE_FLAGS}>" "$<$<CONFIG:Release>:${RELEASE_FLAGS}>"
@ -71,34 +65,18 @@ add_definitions(-DTHREAD_GROUP_MULTIPLIER=${THREAD_FACTOR})
eval(PINNING "cpu;numa" "cpu") eval(PINNING "cpu;numa" "cpu")
add_definitions(-DPINNING=$<STREQUAL:${PINNING},cpu>) add_definitions(-DPINNING=$<STREQUAL:${PINNING},cpu>)
eval(PCM_M "true;false" "false")
add_definitions(-DPCM_M=$<STREQUAL:${PCM_M},true>)
add_definitions(${PCM_LINKS})
# build directory # build directory
set(CMAKE_BINARY_DIR "../bin") #relative to inside build set(CMAKE_BINARY_DIR "../bin") #relative to inside build
set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}) set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR})
# include directories # include directories
include_directories(src/utils) include_directories(src/utils)
include_directories(src/algorithm) include_directories(src/algorithm)
include_directories(src/algorithm/operators) include_directories(src/algorithm/operators)
include_directories(thirdParty/pcm/src)
# link libraries # link libraries
link_libraries(-lnuma -lpthread)
link_libraries(-lnuma -lpthread -l:libdml.a)
# Add targets only below # Add targets only below
# specify build targets # specify build targets
add_executable(FilterAggregatePipeline src/benchmark/filter_aggregate_pipeline.cpp)
add_executable(DoublyFiltered src/benchmark/doubly_filtered_agg.cpp)
add_executable(DIMESBench src/benchmark/DIMES_benchmark.cpp)
add_executable(DIMESCoreBench src/benchmark/DIMES_cores_benchmark.cpp)
add_executable(MicroBench src/benchmark/micro_benchmarks.cpp)
add_executable(MAXBench src/benchmark/MAX_benchmark.cpp
src/benchmark/QDP_minimal.h)
target_link_libraries(MAXBench libpcm.so)
add_executable(LatencyBench src/benchmark/latency.cpp)
add_executable(MAXBench src/benchmark/MAX_benchmark.cpp)

10
qdp_project/bench_all_dimes.sh

@ -1,10 +0,0 @@
#!bin/bash
../bin/DIMESBench_gus
../bin/DIMESBench_guc
../bin/DIMESBench_gls
../bin/DIMESBench_glc
../bin/DIMESBench_lus
../bin/DIMESBench_luc
../bin/DIMESBench_lls
../bin/DIMESBench_llc

7
qdp_project/bench_max.sh

@ -3,13 +3,8 @@
current_date_time=$(date) current_date_time=$(date)
echo "Benchmark start at: $current_date_time" echo "Benchmark start at: $current_date_time"
../bin/MAXBench_gcc
cp ../results/max_q-complex_bm-global_bl-unlimited_tc-121MiB-2MiB.csv ../results/max_q-complex_bm-global_bl-unlimited_tc-121MiB-2MiB_pin_c_HBM.csv
../bin/MAXBench_gcn
cp ../results/max_q-complex_bm-global_bl-unlimited_tc-121MiB-2MiB.csv ../results/max_q-complex_bm-global_bl-unlimited_tc-121MiB-2MiB_pin_n_HBM.csv
../bin/MAXBench
current_date_time=$(date) current_date_time=$(date)
echo "Benchmark end at: $current_date_time" echo "Benchmark end at: $current_date_time"

33
qdp_project/cmake_all_dimes.sh

@ -1,33 +0,0 @@
#!bin/bash
cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=global -DBUFFER_LIMIT=unlimited -DQUERY=simple ..
cmake --build . --target DIMESBench
mv ../bin/DIMESBench ../bin/DIMESBench_gus
cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=global -DBUFFER_LIMIT=unlimited -DQUERY=complex ..
cmake --build . --target DIMESBench
mv ../bin/DIMESBench ../bin/DIMESBench_guc
cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=global -DBUFFER_LIMIT=limited -DQUERY=simple ..
cmake --build . --target DIMESBench
mv ../bin/DIMESBench ../bin/DIMESBench_gls
cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=global -DBUFFER_LIMIT=limited -DQUERY=complex ..
cmake --build . --target DIMESBench
mv ../bin/DIMESBench ../bin/DIMESBench_glc
cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=local -DBUFFER_LIMIT=unlimited -DQUERY=simple ..
cmake --build . --target DIMESBench
mv ../bin/DIMESBench ../bin/DIMESBench_lus
cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=local -DBUFFER_LIMIT=unlimited -DQUERY=complex ..
cmake --build . --target DIMESBench
mv ../bin/DIMESBench ../bin/DIMESBench_luc
cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=local -DBUFFER_LIMIT=limited -DQUERY=simple ..
cmake --build . --target DIMESBench
mv ../bin/DIMESBench ../bin/DIMESBench_lls
cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=local -DBUFFER_LIMIT=limited -DQUERY=complex ..
cmake --build . --target DIMESBench
mv ../bin/DIMESBench ../bin/DIMESBench_llc

240
qdp_project/src/benchmark/DIMES_benchmark.cpp

@ -1,240 +0,0 @@
#include <atomic>
#include <barrier>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <future>
#include <iostream>
#include <limits>
#include <list>
#include <mutex>
#include <queue>
#include <thread>
#include <tuple>
#include <utility>
#include <numa.h>
#ifndef THREAD_GROUP_MULTIPLIER
#define THREAD_GROUP_MULTIPLIER 8
#endif
#ifndef QUERY
#define QUERY 1
#endif
#ifndef BARRIER_MODE
#define BARRIER_MODE "global"
#endif
#ifndef BUFFER_LIMIT
#define BUFFER_LIMIT 1
#endif
#include "const.h"
#include "file_output.h"
#include "array_utils.h"
#include "timer_utils.h"
#include "barrier_utils.h"
#include "cpu_set_utils.h"
#include "iterable_range.h"
#include "memory_literals.h"
#include "pipelines/DIMES_scan_filter_pipe.h"
#include "aggregation.h"
#include "filter.h"
using base_t = uint64_t;
base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value) * row_B[i];
}
return sum;
}
base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i];
}
return sum;
}
int main(int argc, char** argv) {
// set constants
const size_t workload_b = 4_GiB;
const base_t compare_value_a = 50;
const base_t compare_value_b = 42;
constexpr bool simple_query = (QUERY == 1);
const size_t thread_count = 6;
std::ofstream out_file;
out_file.open("../results/dimes_"
"q-" + (std::string)(simple_query == true ? "simple" : "complex") +
"_bm-" + (std::string) BARRIER_MODE +
"_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") +
"_tc-" + std::to_string(thread_count * THREAD_GROUP_MULTIPLIER) + ".csv");
// set benchmark parameter
Linear_Int_Range<uint32_t, 0, 10, 1> run("run");
Exp_Int_Range<size_t, 1_MiB, 8_MiB + 1, 2> chunk_size("chunk_size");
Range<NewPMode, DRAM_base, new_mode_manager, new_mode_manager> mode("mode");
uint32_t remote_node = 3;
uint32_t remote_node_2 = 2;
uint32_t local_node = 10;
print_to_file(out_file, generateHead(run, chunk_size, mode), "thread_group", "time",
#ifdef THREAD_TIMINGS
"scan_a", "scan_b", "aggr_j",
#endif
#ifdef BARRIER_TIMINGS
"wait_scan_a", "wait_scan_b", "wait_aggr_j",
#endif
"result");
/*** alloc data and buffers ************************************************/
base_t* data_a = (base_t*) numa_alloc_onnode(workload_b, remote_node);
base_t* data_b = (base_t*) numa_alloc_onnode(workload_b, remote_node_2);
base_t* data_a_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node);
base_t* data_b_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node);
fill_mt<base_t>(data_a, workload_b, 0, 100, 42);
fill_mt<base_t>(data_b, workload_b, 0, 100, 420);
std::memcpy(data_a_hbm, data_a, workload_b);
std::memcpy(data_b_hbm, data_b, workload_b);
base_t* results = (base_t*) numa_alloc_onnode(THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t), remote_node);
std::ofstream check_file;
check_file.open("../results/dimes_"
"q-" + (std::string)(simple_query == true ? "simple" : "complex") +
"_bm-" + (std::string) BARRIER_MODE +
"_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") +
"_tc-" + std::to_string(thread_count * THREAD_GROUP_MULTIPLIER) + ".checksum");
if constexpr (QUERY == 1) {
//calculate simple checksum if QUERY == 1 -> simple query is applied
check_file << sum_check(compare_value_a, data_a, data_b, workload_b);
} else {
check_file << sum_check_complex(compare_value_a, compare_value_b, data_a, data_b, workload_b);
}
check_file.close();
std::string iteration("init");
Query_Wrapper<base_t, simple_query>* qw = nullptr;
while(iteration != "false") {
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
if(iteration != "run") {
if(qw != nullptr) {
delete qw;
}
std::cout << "Changing to mode " << mode.current << " chunksize " << chunk_size.current << std::endl;
uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A);
uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B);
uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J);
switch(mode.current) {
case NewPMode::DRAM_base:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node,
tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true);
break;
case NewPMode::HBM_base:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a_hbm, data_b_hbm, results, local_node, remote_node,
tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true);
break;
case NewPMode::Mixed_base:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a, data_b_hbm, results, local_node, remote_node,
tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true);
break;
case NewPMode::Prefetch:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node,
tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, false);
break;
}
}
qw->ready_future = &ready_future;
qw->clear_buffers();
auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); };
auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); };
auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); };
std::vector<std::thread> filter_pool;
std::vector<std::thread> copy_pool;
std::vector<std::thread> agg_pool;
uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A);
uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B);
uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J);
int thread_id = 0;
// std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(28, 42), std::make_pair(84, 98)}; // node 2 heacboehm II
//std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(32, 48), std::make_pair(96, 112)}; // node 2 heacboehm
//std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(24, 36), std::make_pair(120, 132)}; // node 2 sapphire rapids
//std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(24, 48)}; // node 2+3 sapphire rapids
std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(0, 48)}; // node 0-3 sapphire rapids
for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) {
for(uint32_t tid = 0; tid < tc_filter; ++tid) {
filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
pin_thread_in_range(filter_pool.back(), thread_id++, pinning_ranges);
}
// if tc_copy == 0 this loop is skipped
for(uint32_t tid = 0; tid < tc_copy; ++tid) {
copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
pin_thread_in_range(copy_pool.back(), thread_id++, pinning_ranges);
}
for(uint32_t tid = 0; tid < tc_agg; ++tid) {
agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
pin_thread_in_range(agg_pool.back(), thread_id++, pinning_ranges);
}
}
auto start = std::chrono::steady_clock::now();
p.set_value();
for(std::thread& t : filter_pool) { t.join(); }
for(std::thread& t : copy_pool) { t.join(); }
for(std::thread& t : agg_pool) { t.join(); }
Aggregation<base_t, Sum, load_mode::Aligned>::apply(results, results, sizeof(base_t) * tc_agg * THREAD_GROUP_MULTIPLIER);
auto end = std::chrono::steady_clock::now();
constexpr double nanos_per_second = ((double)1000) * 1000 * 1000;
uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
double seconds = (double)(nanos) / nanos_per_second;
print_to_file(out_file, run, chunk_size, new_mode_manager::string(mode.current), THREAD_GROUP_MULTIPLIER, seconds,
#ifdef THREAD_TIMINGS
qw->trt->summarize_time(0), qw->trt->summarize_time(1), qw->trt->summarize_time(2),
#endif
#ifdef BARRIER_TIMINGS
qw->bt->summarize_time(0), qw->bt->summarize_time(1), qw->bt->summarize_time(2),
#endif
results[0]);
iteration = IterateOnce(run, chunk_size, mode);
}
numa_free(data_b_hbm, workload_b);
numa_free(data_a, workload_b);
numa_free(data_b, workload_b);
numa_free(results, THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t));
}

260
qdp_project/src/benchmark/DIMES_cores_benchmark.cpp

@ -1,260 +0,0 @@
#include <atomic>
#include <barrier>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <future>
#include <iostream>
#include <limits>
#include <list>
#include <mutex>
#include <queue>
#include <thread>
#include <tuple>
#include <utility>
#include <numa.h>
#ifndef QUERY
#define QUERY 1
#endif
#ifndef BARRIER_MODE
#define BARRIER_MODE "global"
#endif
#define BUFFER_LIMIT 0
#include "const.h"
#include "file_output.h"
#include "array_utils.h"
#include "timer_utils.h"
#include "barrier_utils.h"
#include "cpu_set_utils.h"
#include "iterable_range.h"
#include "memory_literals.h"
#include "pipelines/DIMES_scan_filter_pipe.h"
#include "aggregation.h"
#include "filter.h"
using base_t = uint64_t;
base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value) * row_B[i];
}
return sum;
}
base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i];
}
return sum;
}
int main(int argc, char** argv) {
// set constants
const size_t workload_b = 4_GiB;
const size_t chunk_size = 2_MiB;
const base_t compare_value_a = 50;
const base_t compare_value_b = 42;
constexpr bool simple_query = (QUERY == 1);
std::ofstream out_file;
out_file.open("../results/dimes_cores_"
"q-" + (std::string)(simple_query == true ? "simple" : "complex") +
"_bm-" + (std::string) BARRIER_MODE +
"_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") +
".csv");
// set benchmark parameter
Linear_Int_Range<uint32_t, 0, 3, 1> run("run");
Exp_Int_Range<uint32_t, 1, 4+1, 2> scan_a_thread("scan_a_tc");
Exp_Int_Range<uint32_t, 1, 4+1, 2> scan_b_thread("scan_b_tc");
Exp_Int_Range<uint32_t, 1, 4+1, 2> aggr_j_thread("aggr_j_tc");
Linear_Int_Range<uint32_t, 1, 16+1, 1> thread_group_count("thread_group_c");
Range<NewPMode, DRAM_base, new_mode_manager, new_mode_manager> mode("mode");
uint32_t remote_node = 1;
uint32_t remote_node_2 = 0;//on heacboehm II: node 0 is two hops away from node 2 -> prefetching is more beneficial
uint32_t local_node = 2;
print_to_file(out_file, generateHead(run, thread_group_count, mode, scan_a_thread, scan_b_thread, aggr_j_thread),
"time",
#ifdef THREAD_TIMINGS
"scan_a", "scan_b", "aggr_j",
#endif
#ifdef BARRIER_TIMINGS
"wait_scan_a", "wait_scan_b", "wait_aggr_j",
#endif
"result");
/*** alloc data and buffers ************************************************/
base_t* data_a = (base_t*) numa_alloc_onnode(workload_b, remote_node);
base_t* data_b = (base_t*) numa_alloc_onnode(workload_b, remote_node_2);
base_t* data_a_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node);
base_t* data_b_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node);
fill_mt<base_t>(data_a, workload_b, 0, 100, 42);
fill_mt<base_t>(data_b, workload_b, 0, 100, 420);
std::memcpy(data_a_hbm, data_a, workload_b);
std::memcpy(data_b_hbm, data_b, workload_b);
base_t* results = (base_t*) numa_alloc_onnode(thread_group_count.max * aggr_j_thread.max * sizeof(base_t), remote_node);
std::ofstream check_file;
check_file.open("../results/dimes_cores_"
"q-" + (std::string)(simple_query == true ? "simple" : "complex") +
"_bm-" + (std::string) BARRIER_MODE +
"_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") +
".checksum");
if constexpr (QUERY == 1) {
//calculate simple checksum if QUERY == 1 -> simple query is applied
check_file << sum_check(compare_value_a, data_a, data_b, workload_b);
} else {
check_file << sum_check_complex(compare_value_a, compare_value_b, data_a, data_b, workload_b);
}
check_file.close();
std::string iteration("init");
Query_Wrapper<base_t, simple_query>* qw = nullptr;
while(iteration != "false") {
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
// skipping iteration through scan_b_thread while not used
while(simple_query && mode.current != NewPMode::Prefetch && scan_b_thread.current != 1) {
iteration = IterateOnce(run, thread_group_count, mode, scan_a_thread, scan_b_thread, aggr_j_thread);
}
if(iteration != "run") {
std::cout << "Changing to mode " << mode.current
<< " thread_group_count " << thread_group_count.current
<< " thread_ratio " << scan_a_thread.current <<":"<< scan_b_thread.current <<":"<< aggr_j_thread.current
<< std::endl;
if(qw != nullptr) {
if (iteration == thread_group_count.label) {
} else {
delete qw;
uint32_t sat = scan_a_thread.current;
uint32_t sbt = simple_query && mode.current != NewPMode::Prefetch ? 0 : scan_b_thread.current;
uint32_t ajt = aggr_j_thread.current;
switch(mode.current) {
case NewPMode::DRAM_base:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size, data_a, data_b, results, local_node, remote_node,
sat, sbt, ajt, mode.current, thread_group_count.current, (base_t) 50, (base_t) 42, true);
break;
case NewPMode::HBM_base:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size, data_a_hbm, data_b_hbm, results, local_node, remote_node,
sat, sbt, ajt, mode.current, thread_group_count.current, (base_t) 50, (base_t) 42, true);
break;
case NewPMode::Mixed_base:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size, data_a, data_b_hbm, results, local_node, remote_node,
sat, sbt, ajt, mode.current, thread_group_count.current, (base_t) 50, (base_t) 42, true);
break;
case NewPMode::Prefetch:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size, data_a, data_b, results, local_node, remote_node,
sat, sbt, ajt, mode.current, thread_group_count.current, (base_t) 50, (base_t) 42, false);
break;
}
}
}
}
qw->ready_future = &ready_future;
qw->clear_buffers();
auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); };
auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); };
auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); };
std::vector<std::thread> filter_pool;
std::vector<std::thread> copy_pool;
std::vector<std::thread> agg_pool;
uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A);
uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B);
uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J);
int thread_id = 0;
// std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(28, 42), std::make_pair(84, 98)}; // node 2 heacboehm II
std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(32, 48), std::make_pair(96, 112)}; // node 2 heacboehm
for(uint32_t gid = 0; gid < thread_group_count.current; ++gid) {
for(uint32_t tid = 0; tid < tc_filter; ++tid) {
filter_pool.emplace_back(filter_lambda, gid, thread_group_count.current, tid);
pin_thread_in_range(filter_pool.back(), thread_id++, pinning_ranges);
}
// if tc_copy == 0 this loop is skipped
for(uint32_t tid = 0; tid < tc_copy; ++tid) {
copy_pool.emplace_back(copy_lambda, gid, thread_group_count.current, tid);
pin_thread_in_range(copy_pool.back(), thread_id++, pinning_ranges);
}
for(uint32_t tid = 0; tid < tc_agg; ++tid) {
agg_pool.emplace_back(aggregation_lambda, gid, thread_group_count.current, tid);
pin_thread_in_range(agg_pool.back(), thread_id++, pinning_ranges);
}
}
auto start = std::chrono::steady_clock::now();
p.set_value();
for(std::thread& t : filter_pool) { t.join(); }
for(std::thread& t : copy_pool) { t.join(); }
for(std::thread& t : agg_pool) { t.join(); }
Aggregation<base_t, Sum, load_mode::Aligned>::apply(results, results, sizeof(base_t) * tc_agg * thread_group_count.current);
auto end = std::chrono::steady_clock::now();
constexpr double nanos_per_second = ((double)1000) * 1000 * 1000;
uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
double seconds = (double)(nanos) / nanos_per_second;
print_to_file(out_file, generateHead(run, thread_group_count, mode, scan_a_thread, scan_b_thread, aggr_j_thread),
"time",
#ifdef THREAD_TIMINGS
"scan_a", "scan_b", "aggr_j",
#endif
#ifdef BARRIER_TIMINGS
"wait_scan_a", "wait_scan_b", "wait_aggr_j",
#endif
"result");
print_to_file(out_file, run, thread_group_count.current, new_mode_manager::string(mode.current), scan_a_thread,
(simple_query && mode.current != NewPMode::Prefetch ? 0 : scan_b_thread.current),
aggr_j_thread, seconds,
#ifdef THREAD_TIMINGS
qw->trt->summarize_time(0), qw->trt->summarize_time(1), qw->trt->summarize_time(2),
#endif
#ifdef BARRIER_TIMINGS
qw->bt->summarize_time(0), qw->bt->summarize_time(1), qw->bt->summarize_time(2),
#endif
results[0]);
iteration = IterateOnce(run, thread_group_count, mode, scan_a_thread, scan_b_thread, aggr_j_thread);
}
numa_free(data_b_hbm, workload_b);
numa_free(data_a, workload_b);
numa_free(data_b, workload_b);
numa_free(results, thread_group_count.max * aggr_j_thread.max * sizeof(base_t));
}

81
qdp_project/src/benchmark/MAX_benchmark.cpp

@ -92,33 +92,36 @@ int main(int argc, char** argv) {
#endif #endif
// set constants // set constants
const size_t workload_b = 2_GiB;
const base_t compare_value_a = 50;
const base_t compare_value_b = 42;
constexpr size_t workload_b = 2_GiB;
constexpr base_t compare_value_a = 50;
constexpr base_t compare_value_b = 42;
constexpr bool simple_query = (QUERY == 1); constexpr bool simple_query = (QUERY == 1);
constexpr bool cache_a = false;
constexpr bool wait_b = false;
constexpr size_t chunk_min = 1_MiB;
constexpr size_t chunk_max = 8_MiB + 1;
constexpr size_t chunk_incr = 128_kiB;
// thread count is 12 here but as the default measurement uses 6
// we must restrict the core assignment of these 12 threads to
// 6 physical cpu cores on the executing node
constexpr size_t thread_count = 12;
const size_t thread_count = 6;
std::ofstream out_file; std::ofstream out_file;
out_file.open("../results/max_" out_file.open("../results/max_"
"q-" + (std::string)(simple_query == true ? "simple" : "complex") + "q-" + (std::string)(simple_query == true ? "simple" : "complex") +
"_bm-" + (std::string) BARRIER_MODE + "_bm-" + (std::string) BARRIER_MODE +
"_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") +
"_tc-" + std::to_string(thread_count * THREAD_GROUP_MULTIPLIER) + "1MiB-2MiB.csv");
"_tc-" + std::to_string(thread_count) + "1MiB-2MiB.csv");
// set benchmark parameter // set benchmark parameter
Linear_Int_Range<uint32_t, 0, 30, 1> run("run"); Linear_Int_Range<uint32_t, 0, 30, 1> run("run");
constexpr size_t chunk_min = 1_MiB; constexpr size_t chunk_max = 8_MiB + 1; constexpr size_t chunk_incr = 128_kiB;
Linear_Int_Range<size_t, chunk_min, chunk_max, chunk_incr> chunk_size("chunk_size"); Linear_Int_Range<size_t, chunk_min, chunk_max, chunk_incr> chunk_size("chunk_size");
Range<NewPMode, DRAM_base, new_mode_manager, new_mode_manager> mode("mode"); Range<NewPMode, DRAM_base, new_mode_manager, new_mode_manager> mode("mode");
uint32_t remote_node = 2;
uint32_t remote_node_2 = 2;
uint32_t local_node = 10;
/*uint32_t remote_node = 6;
uint32_t remote_node_2 = 6;
uint32_t local_node = 2;*/
print_to_file(out_file, generateHead(run, chunk_size, mode), "thread_group", "time", print_to_file(out_file, generateHead(run, chunk_size, mode), "thread_group", "time",
#ifdef THREAD_TIMINGS #ifdef THREAD_TIMINGS
"scan_a", "scan_b", "aggr_j", "scan_a", "scan_b", "aggr_j",
@ -133,24 +136,22 @@ int main(int argc, char** argv) {
#endif #endif
"result"); "result");
/*** alloc data and buffers ************************************************/ /*** alloc data and buffers ************************************************/
base_t* data_a = (base_t*) numa_alloc_onnode(workload_b, remote_node);
base_t* data_b = (base_t*) numa_alloc_onnode(workload_b, remote_node_2);
base_t* data_a_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node);
base_t* data_b_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node);
base_t* data_a = (base_t*) numa_alloc_local(workload_b);
base_t* data_b = (base_t*) numa_alloc_local(workload_b);
base_t* results = (base_t*) numa_alloc_local(thread_count * sizeof(base_t));
fill_mt<base_t>(data_a, workload_b, 0, 100, 42); fill_mt<base_t>(data_a, workload_b, 0, 100, 42);
fill_mt<base_t>(data_b, workload_b, 0, 100, 420); fill_mt<base_t>(data_b, workload_b, 0, 100, 420);
std::memcpy(data_a_hbm, data_a, workload_b);
std::memcpy(data_b_hbm, data_b, workload_b);
base_t* results = (base_t*) numa_alloc_onnode(THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t), remote_node);
std::ofstream check_file; std::ofstream check_file;
check_file.open("../results/max_" check_file.open("../results/max_"
"q-" + (std::string)(simple_query == true ? "simple" : "complex") + "q-" + (std::string)(simple_query == true ? "simple" : "complex") +
"_bm-" + (std::string) BARRIER_MODE + "_bm-" + (std::string) BARRIER_MODE +
"_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") +
"_tc-" + std::to_string(thread_count * THREAD_GROUP_MULTIPLIER) + ".checksum");
"_tc-" + std::to_string(thread_count) + ".checksum");
if constexpr (QUERY == 1) { if constexpr (QUERY == 1) {
//calculate simple checksum if QUERY == 1 -> simple query is applied //calculate simple checksum if QUERY == 1 -> simple query is applied
check_file << sum_check(compare_value_a, data_a, data_b, workload_b); check_file << sum_check(compare_value_a, data_a, data_b, workload_b);
@ -160,37 +161,34 @@ int main(int argc, char** argv) {
check_file.close(); check_file.close();
std::string iteration("init"); std::string iteration("init");
Query_Wrapper<base_t, simple_query>* qw = nullptr;
Query_Wrapper<base_t, simple_query, cache_a, wait_b>* qw = nullptr;
while(iteration != "false") { while(iteration != "false") {
std::promise<void> p; std::promise<void> p;
std::shared_future<void> ready_future(p.get_future()); std::shared_future<void> ready_future(p.get_future());
if(iteration != "run") { if(iteration != "run") {
if(qw != nullptr) { if(qw != nullptr) {
delete qw; delete qw;
} }
uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A); uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A);
uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B); uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B);
uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J); uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J);
switch(mode.current) { switch(mode.current) {
case NewPMode::DRAM_base:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node,
tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true);
break;
case NewPMode::HBM_base:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a_hbm, data_b_hbm, results, local_node, remote_node,
tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true);
break;
case NewPMode::Mixed_base:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a, data_b_hbm, results, local_node, remote_node,
tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true);
break;
case NewPMode::Prefetch:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node,
tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, false);
case NewPMode::Prefetch:
qw = new Query_Wrapper<base_t, simple_query, cache_a, wait_b>(
&ready_future, workload_b, chunk_size.current,
data_a, data_b, results, tc_filter, tc_copy, tc_agg,
mode.current, 50, 42
);
break; break;
default:
std::cerr << "[x] Unsupported Execution Mode by this build." << std::endl;
exit(-1);
} }
} }
@ -280,10 +278,7 @@ int main(int argc, char** argv) {
iteration = IterateOnce(run, chunk_size, mode); iteration = IterateOnce(run, chunk_size, mode);
} }
numa_free(data_b_hbm, workload_b);
numa_free(data_a, workload_b); numa_free(data_a, workload_b);
numa_free(data_b, workload_b); numa_free(data_b, workload_b);
numa_free(results, THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t)); numa_free(results, THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t));
} }

147
qdp_project/src/benchmark/QDP_minimal.h

@ -1,147 +0,0 @@
#include <chrono>
#include <iostream>
#include <thread>
#include <future>
#include <numa.h>
#include "const.h"
#include "array_utils.h"
#include "cpu_set_utils.h"
#include "iterable_range.h"
#include "memory_literals.h"
#include "pipelines/MAX_scan_filter_pipe.h"
#include "aggregation.h"
using base_t = uint64_t;
// calculate the checksum for the simple query
base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value) * row_B[i];
}
return sum;
}
// calculate the checksum for the complex query
base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i];
}
return sum;
}
class QDP_minimal {
private:
// values used for comparisons in the filter operations
const base_t compare_value_a = 50;
const base_t compare_value_b = 42;
// define, which numa nodes to use
// Xeon Max: node 0-7 DRAM and 8-15 HBM
// if the nodes are changed, the pinning ranges in run should be adjusted accordingly too
uint32_t dram_node = 2;
uint32_t dram_node_2 = 2;
uint32_t hbm_node = 10;
public:
// results of running qdp, set by run()
base_t result;
base_t checksum;
double exec_time;
// run qdp
void run(const size_t workload_b, size_t chunk_size, uint8_t tc_filter, uint8_t tc_copy, uint8_t tc_agg){
// allocate data
base_t* data_a = (base_t*) numa_alloc_onnode(workload_b, dram_node);
base_t* data_b = (base_t*) numa_alloc_onnode(workload_b, dram_node_2);
base_t* results = (base_t*) numa_alloc_onnode(THREAD_GROUP_MULTIPLIER * tc_agg * sizeof(base_t), dram_node);
// fill the memory with acutal values
fill_mt<base_t>(data_a, workload_b, 0, 100, 42);
fill_mt<base_t>(data_b, workload_b, 0, 100, 420);
// run qdp
run(data_a, data_b, results, workload_b, chunk_size, tc_filter, tc_copy, tc_agg);
// free the allocated memory
numa_free(data_a, workload_b);
numa_free(data_b, workload_b);
numa_free(results, THREAD_GROUP_MULTIPLIER * tc_agg * sizeof(base_t));
}
// run qdp, work on provided memory pointers to enable memory reuse across multiple runs
void run(base_t* data_a, base_t* data_b, base_t* results, const size_t workload_b, size_t chunk_size, uint8_t tc_filter, uint8_t tc_copy, uint8_t tc_agg){
constexpr bool simple_query = (QUERY == 1);
// sync objects
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
// create the query wrapper, that is managing the to-be-used threads
Query_Wrapper<base_t, simple_query>* qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size, data_a, data_b, results, hbm_node, dram_node,
tc_filter, tc_copy, tc_agg, NewPMode::Prefetch, THREAD_GROUP_MULTIPLIER, compare_value_a, compare_value_b, false);
// clear buffers to make sure, that they have been written and are fully mapped before running qdp
qw->clear_buffers();
// creating lambdas for executing filter (scan_a), copy (scan_b), and aggregation tasks on the query wrapper
// passing gid (group id), gcnt (group count) and tid (thread id)
auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); };
auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); };
auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); };
// creating thread pools, holding all used threads
std::vector<std::thread> filter_pool;
std::vector<std::thread> copy_pool;
std::vector<std::thread> agg_pool;
int thread_id = 0;
// cpus on node 2 (for sapphire rapids), that the threads should be executed on
std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(24, 36), std::make_pair(120, 132)};
// create all threads for all thread groups and for every task (copy, filter, aggregation), according their specific theadcount
for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) {
for(uint32_t tid = 0; tid < tc_filter; ++tid) {
filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
pin_thread_in_range(filter_pool.back(), thread_id++, pinning_ranges);
}
for(uint32_t tid = 0; tid < tc_copy; ++tid) {
copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
pin_thread_in_range(copy_pool.back(), thread_id++, pinning_ranges);
}
for(uint32_t tid = 0; tid < tc_agg; ++tid) {
agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
pin_thread_in_range(agg_pool.back(), thread_id++, pinning_ranges);
}
}
// start the clock
auto start = std::chrono::steady_clock::now();
// set value to the promise, to signal the waiting threads, that they can start now
p.set_value();
// wait for all thread to be finished
for(std::thread& t : filter_pool) { t.join(); }
for(std::thread& t : copy_pool) { t.join(); }
for(std::thread& t : agg_pool) { t.join(); }
// sum up the results of all the aggregation threads to get a final result
Aggregation<base_t, Sum, load_mode::Aligned>::apply(&result, results, sizeof(base_t) * tc_agg * THREAD_GROUP_MULTIPLIER);
auto end = std::chrono::steady_clock::now();
// get the overall execution time in seconds
constexpr double nanos_per_second = ((double)1000) * 1000 * 1000;
uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
exec_time = (double)(nanos) / nanos_per_second;
// calculate the checksum according to the used query
if constexpr (QUERY == 1) {
// QUERY == 1 -> simple query is applied
checksum = sum_check(compare_value_a, data_a, data_b, workload_b);
} else {
checksum = sum_check_complex(compare_value_a, compare_value_b, data_a, data_b, workload_b);
}
delete qw;
}
};

149
qdp_project/src/benchmark/doubly_filtered_agg.cpp

@ -1,149 +0,0 @@
#include <cstring>
#include <fstream>
#include <future>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <numa.h>
#include "aggregation.h"
#include "array_utils.h"
#include "cpu_set_utils.h"
#include "file_output.h"
#include "iterable_range.h"
#include "memory_literals.h"
#include "pipelines/scan_filter_pipe.h"
int main () {
using base_t = uint64_t;
const size_t workload = 2_GiB;
const char filename[256] = "../results/doubly_filtered_results_stronger_affinity_.csv";
const uint32_t numa_local = 2;
const uint32_t numa_remote = 3;
Linear_Int_Range<uint32_t, 1, 6, 1> thread_group("thread_groups");
Exp_Int_Range<uint32_t, 1, 5, 2> thread_count_filter("thread_cnt_filter");
Exp_Int_Range<uint32_t, 1, 5, 2> thread_count_filter_copy("thread_cnt_filter_copy");
Exp_Int_Range<uint32_t, 1, 5, 2> thread_count_aggregation("thread_cnt_agg");
Linear_Int_Range<uint32_t, 0, 30, 1> run("run");
Range<PMode, no_copy, mode_manager, mode_manager> mode("mode");
Exp_Int_Range<size_t, 1_MiB, 8_MiB + 1, 2> chunk_size("chunk_size");
std::ofstream out_file;
out_file.open(filename);
print_to_file(out_file, generateHead(run, chunk_size, mode, thread_count_filter, thread_count_filter_copy,
thread_count_aggregation, thread_group), "time", "scan_a", "scan_b", "aggr_j", "wait_aggr", "results");
base_t* data_a = (base_t*) numa_alloc_onnode(workload, numa_remote);
base_t* data_b = (base_t*) numa_alloc_onnode(workload, numa_remote);
base_t* data_b_hbm = (base_t*) numa_alloc_onnode(workload, numa_local);
fill_mt<base_t>(data_a, workload, 0, 100, 42);
fill_mt<base_t>(data_b, workload, 0, 100, 420);
std::memcpy(data_b_hbm, data_b, workload);
base_t* result = (base_t*) numa_alloc_onnode(thread_group.max * thread_count_aggregation.max * sizeof(base_t),
numa_remote);
std::string iteration("init");
Query_Wrapper<base_t, false>* qw = nullptr;
while(iteration != "false") {
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
if(iteration != "run") {
if(qw != nullptr) {
delete qw;
}
switch(mode.current) {
case PMode::expl_copy:
qw = new Query_Wrapper<base_t, false>(&ready_future, workload, chunk_size.current, data_a, data_b, result, numa_local, numa_remote,
thread_count_filter.current, thread_count_filter_copy.current, thread_count_aggregation.current,
mode.current, thread_group.current, (base_t) 50, (base_t) 42, false);
break;
case PMode::no_copy:
qw = new Query_Wrapper<base_t, false>(&ready_future, workload, chunk_size.current, data_a, data_b, result, numa_local, numa_remote,
thread_count_filter.current, thread_count_filter_copy.current, thread_count_aggregation.current,
mode.current, thread_group.current, (base_t) 50, (base_t) 42, true);
break;
case PMode::hbm:
qw = new Query_Wrapper<base_t, false>(&ready_future, workload, chunk_size.current, data_a, data_b_hbm, result, numa_local, numa_remote,
thread_count_filter.current, thread_count_filter_copy.current, thread_count_aggregation.current,
mode.current, thread_group.current, (base_t) 50, (base_t) 42, true);
break;
}
}
qw->ready_future = &ready_future;
qw->clear_buffers();
// todo create threads depending on mode
std::vector<std::thread> thread_pool;
auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); };
auto filter_copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); };
auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); };
/* Intel Xeon Gold 6130 // todo implement different for 5120 -> fewer cpus
node 0 cpus: 0-15 64- 79
node 1 cpus: 16-31 80- 95
node 2 cpus: 32-47 96-111
node 3 cpus: 48-63 112-127
*/
int thread_id = 0;
std::vector<std::pair<int, int>> range {std::make_pair(0, 16), std::make_pair(64, 80)};
for(uint32_t gid = 0; gid < thread_group.current; ++gid) {
for(uint32_t tid = 0; tid < thread_count_filter.current; ++tid) {
thread_pool.emplace_back(filter_lambda, gid, thread_group.current, tid);
pin_thread_in_range(thread_pool.back(), thread_id++, range);
}
for(uint32_t tid = 0; tid < thread_count_filter_copy.current; ++tid) {
thread_pool.emplace_back(filter_copy_lambda, gid, thread_group.current, tid);
pin_thread_in_range(thread_pool.back(), thread_id++, range);
}
for(uint32_t tid = 0; tid < thread_count_aggregation.current; ++tid) {
thread_pool.emplace_back(aggregation_lambda, gid, thread_group.current, tid);
pin_thread_in_range(thread_pool.back(), thread_id++, range);
}
}
auto start = std::chrono::steady_clock::now();
p.set_value();
// wait for every thread to join
for(std::thread& t : thread_pool) t.join();
// aggregate all partial results
Aggregation<base_t, Sum, load_mode::Aligned>::apply(result, result,
sizeof(base_t) * thread_count_aggregation.current * thread_group.current);
auto end = std::chrono::steady_clock::now();
double duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end-start).count() / (double)1000000000;
//TODO add mode
print_to_file(out_file, run, chunk_size, mode_manager::string(mode.current), thread_count_filter,
thread_count_filter_copy, thread_count_aggregation, thread_group, duration,
qw->trt->summarize_time(0), qw->trt->summarize_time(1),
qw->trt->summarize_time(2), qw->trt->summarize_time(3), *result);
iteration = IterateOnce(run, chunk_size, mode, thread_count_filter, thread_count_filter_copy, thread_count_aggregation, thread_group);
}
auto end = std::chrono::system_clock::now();
std::time_t end_time = std::chrono::system_clock::to_time_t(end);
std::cout << "finished computation at " << std::ctime(&end_time) << std::endl;
print_to_file(out_file, std::ctime(&end_time));
}

184
qdp_project/src/benchmark/filter_aggregate_pipeline.cpp

@ -1,184 +0,0 @@
#include <atomic>
#include <barrier>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <future>
#include <iostream>
#include <limits>
#include <list>
#include <mutex>
#include <queue>
#include <thread>
#include <tuple>
#include <utility>
#include <numa.h>
#include "const.h"
#include "file_output.h"
#include "array_utils.h"
#include "timer_utils.h"
#include "barrier_utils.h"
#include "cpu_set_utils.h"
#include "iterable_range.h"
#include "memory_literals.h"
#include "pipelines/scan_filter_pipe.h"
#include "aggregation.h"
#include "filter.h"
using base_t = uint64_t;
base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value) * row_B[i];
}
return sum;
}
int main(int argc, char** argv) {
size_t workload_b = 2_GiB;
std::ofstream out_file;
out_file.open("filter_aggreagate_pipe_bm_" + (std::string) BARRIER_MODE + ".csv");
Linear_Int_Range<uint32_t, 1, 7, 1> thread_group("thread_groups");
Linear_Int_Range<uint32_t, 0, 10, 1> run("run");
Exp_Int_Range<size_t, 1_MiB, 8_MiB + 1, 2> chunk_size("chunk_size");
Linear_Int_Range<uint32_t, 1, 2, 1> thread_count_filter("thread_cnt_filter");
Linear_Int_Range<uint32_t, 2, 3, 1> thread_count_copy("thread_cnt_copy");
Linear_Int_Range<uint32_t, 1, 2, 1> thread_count_aggregation("thread_cnt_agg");
Range<PMode, no_copy, mode_manager, mode_manager> mode("mode");
uint32_t remote_node = 2;
uint32_t remote_node_2 = 2;
uint32_t local_node = 10;
print_to_file(out_file, generateHead(run, chunk_size, mode, thread_count_filter, thread_count_copy,
thread_count_aggregation, thread_group), "time",
#ifdef THREAD_TIMINGS
"scan_a", "scan_b", "aggr_j",
#endif
#ifdef BARRIER_TIMINGS
"wait_scan_a", "wait_scan_b", "wait_aggr_j",
#endif
"result");
/*** alloc data and buffers ************************************************/
base_t* data_a = (base_t*) numa_alloc_onnode(workload_b, remote_node);
base_t* data_b = (base_t*) numa_alloc_onnode(workload_b, remote_node_2);
base_t* data_b_hbm = (base_t *) numa_alloc_onnode(workload_b, local_node);
fill_mt<base_t>(data_a, workload_b, 0, 100, 42);
fill_mt<base_t>(data_b, workload_b, 0, 100, 420);
std::memcpy(data_b_hbm, data_b, workload_b);
base_t* results = (base_t*) numa_alloc_onnode(thread_group.max * thread_count_aggregation.max * sizeof(base_t), remote_node);
std::string iteration("init");
const bool simple_query = true;
Query_Wrapper<base_t, simple_query>* qw = nullptr;
while(iteration != "false") {
base_t compare_value = 50;
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
if(iteration != "run") {
if(qw != nullptr) {
delete qw;
}
std::cout << "Changing to mode " << mode.current << " chunksize " << chunk_size.current << " thread_group " << thread_group.current << std::endl;
switch(mode.current) {
case PMode::expl_copy:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node,
thread_count_filter.current, thread_count_copy.current, thread_count_aggregation.current, mode.current, thread_group.current, (base_t) 50, (base_t) 42, false);
break;
case PMode::no_copy:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node,
thread_count_filter.current, thread_count_copy.current, thread_count_aggregation.current, mode.current, thread_group.current, (base_t) 50, (base_t) 42, true);
break;
case PMode::hbm:
qw = new Query_Wrapper<base_t, simple_query>(&ready_future, workload_b, chunk_size.current, data_a, data_b_hbm, results, local_node, remote_node,
thread_count_filter.current, thread_count_copy.current, thread_count_aggregation.current, mode.current, thread_group.current, (base_t) 50, (base_t) 42, true);
break;
}
}
qw->ready_future = &ready_future;
qw->clear_buffers();
auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); };
auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); };
auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); };
std::vector<std::thread> filter_pool;
std::vector<std::thread> copy_pool;
std::vector<std::thread> agg_pool;
int thread_id = 0;
// std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(28, 42), std::make_pair(84, 98)}; // node 2 heacboehm2
std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(32, 48), std::make_pair(96, 112)}; // node 2 heacboehm
for(uint32_t gid = 0; gid < thread_group.current; ++gid) {
for(uint32_t tid = 0; tid < thread_count_filter.current; ++tid) {
filter_pool.emplace_back(filter_lambda, gid, thread_group.current, tid);
pin_thread_in_range(filter_pool.back(), thread_id++, pinning_ranges);
}
if(mode.current == PMode::expl_copy){
for(uint32_t tid = 0; tid < thread_count_copy.current; ++tid) {
copy_pool.emplace_back(copy_lambda, gid, thread_group.current, tid);
pin_thread_in_range(copy_pool.back(), thread_id++, pinning_ranges);
}
}
for(uint32_t tid = 0; tid < thread_count_aggregation.current; ++tid) {
agg_pool.emplace_back(aggregation_lambda, gid, thread_group.current, tid);
pin_thread_in_range(agg_pool.back(), thread_id++, pinning_ranges);
}
}
auto start = std::chrono::steady_clock::now();
p.set_value();
for(std::thread& t : filter_pool) { t.join(); }
for(std::thread& t : copy_pool) { t.join(); }
for(std::thread& t : agg_pool) { t.join(); }
Aggregation<base_t, Sum, load_mode::Aligned>::apply(results, results, sizeof(base_t) * thread_count_aggregation.current * thread_group.current);
auto end = std::chrono::steady_clock::now();
constexpr double nanos_per_second = ((double)1000) * 1000 * 1000;
uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
double seconds = (double)(nanos) / nanos_per_second;
print_to_file(out_file, run, chunk_size, mode_manager::string(mode.current), thread_count_filter,
thread_count_copy, thread_count_aggregation, thread_group, seconds,
#ifdef THREAD_TIMINGS
qw->trt->summarize_time(0), qw->trt->summarize_time(1), qw->trt->summarize_time(2),
#endif
#ifdef BARRIER_TIMINGS
qw->bt->summarize_time(0), qw->bt->summarize_time(1), qw->bt->summarize_time(2),
#endif
results[0]);
iteration = IterateOnce(run, chunk_size, mode, thread_count_filter, thread_count_copy, thread_count_aggregation, thread_group);
}
numa_free(data_b_hbm, workload_b);
numa_free(data_a, workload_b);
numa_free(data_b, workload_b);
numa_free(results, thread_group.max * sizeof(base_t));
}

188
qdp_project/src/benchmark/latency.cpp

@ -1,188 +0,0 @@
/*
* numa_memory_latency
* Copyright (c) 2017 UMEZAWA Takeshi
* This software is licensed under GNU GPL version 2 or later.
*
* This file has been modified
*/
#include <algorithm>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <iostream>
#include <unistd.h>
#include <ctime>
#include "file_output.h"
#include <vector>
#include <random>
#include <algorithm>
#include <numa.h>
#ifndef VOLATILE
#define VOLATILE 0
#endif
#define cachelinesize 64
union CACHELINE {
char cacheline[cachelinesize];
#if VOLATILE
volatile CACHELINE* next;
#else
CACHELINE* next;
#endif /*VOLATILE*/
};
#define REPT4(x) do { x; x; x; x; } while(0)
#define REPT16(x) do { REPT4(x); REPT4(x); REPT4(x); REPT4(x); } while(0);
#define REPT64(x) do { REPT16(x); REPT16(x); REPT16(x); REPT16(x); } while(0);
#define REPT256(x) do { REPT64(x); REPT64(x); REPT64(x); REPT64(x); } while(0);
#define REPT1024(x) do { REPT256(x); REPT256(x); REPT256(x); REPT256(x); } while(0);
size_t bufsize = 1 * 1024 * 1024 * 1024;
size_t nloop = 128 * 1024;
std::vector<size_t> offsets;
#if VOLATILE
volatile CACHELINE* walk(volatile CACHELINE* start)
{
volatile CACHELINE* p = start;
for (size_t i = 0; i < nloop; ++i) {
REPT1024(p = p->next);
}
return p;
}
#else
CACHELINE* walk(CACHELINE* start, uint64_t* sum)
{
CACHELINE* p = start;
for (size_t i = 0; i < nloop; ++i) {
REPT1024(
*sum += static_cast<uint64_t>(p->cacheline[cachelinesize-1]);
p = p->next;
);
}
return p;
}
#endif /*VOLATILE*/
void bench(int tasknode, int memnode, std::ofstream* out_file)
{
struct timespec ts_begin, ts_end, ts_elapsed;
printf("bench(task=%d, mem=%d)\n", tasknode, memnode);
if (numa_run_on_node(tasknode) != 0) {
printf("failed to run on node: %s\n", strerror(errno));
return;
}
CACHELINE* const buf = (CACHELINE*)numa_alloc_onnode(bufsize, memnode);
if (buf == NULL) {
printf("failed to allocate memory\n");
return;
}
for (size_t i = 0; i < offsets.size() - 1; ++i) {
// assuming that next-pointer never overwrites last Byte of the cacheline/union
buf[offsets[i]].cacheline[cachelinesize-1] = offsets[i] % 128;
buf[offsets[i]].next = buf + offsets[i+1];
}
buf[offsets[offsets.size() - 1]].next = buf;
buf[offsets[offsets.size() - 1]].cacheline[cachelinesize-1] = offsets[offsets.size() - 1] % 128;
uint64_t value = 0;
uint64_t* sum = &value;
clock_gettime(CLOCK_MONOTONIC, &ts_begin);
#if VOLATILE
walk(buf);
#else
walk(buf, sum);
#endif /*VOLATILE*/
clock_gettime(CLOCK_MONOTONIC, &ts_end);
ts_elapsed.tv_nsec = ts_end.tv_nsec - ts_begin.tv_nsec;
ts_elapsed.tv_sec = ts_end.tv_sec - ts_begin.tv_sec;
if (ts_elapsed.tv_nsec < 0) {
--ts_elapsed.tv_sec;
ts_elapsed.tv_nsec += 1000*1000*1000;
}
double elapsed = ts_elapsed.tv_sec + 0.000000001 * ts_elapsed.tv_nsec;
printf("took %fsec. %fns/load\n", elapsed, elapsed/(1024*nloop)*(1000*1000*1000));
print_to_file(*out_file, tasknode, memnode, elapsed/(1024*nloop)*(1000*1000*1000), *sum);
numa_free(buf, bufsize);
}
struct RND {
std::mt19937 mt;
RND() : mt(time(NULL)) {}
std::mt19937::result_type operator()(std::mt19937::result_type n) { return mt() % n; }
} r;
void usage(const char* prog)
{
printf("usage: %s [-h] [bufsize] [nloop]\n", prog);
}
int main(int argc, char* argv[])
{
int ch;
while ((ch = getopt(argc, argv, "h")) != -1) {
switch (ch) {
case 'h':
default:
usage(argv[0]);
exit(1);
}
}
argc -= optind;
argv += optind;
if (argc > 1) {
// 1048576 KiB = 1 GiB
bufsize = atoi(argv[0]) * 1024; // in KiB
nloop = atoi(argv[1]) * 1024;
}
offsets.resize(bufsize / cachelinesize);
for (size_t i = 0; i < offsets.size(); ++i)
offsets[i] = i;
std::random_shuffle(offsets.begin() + 1, offsets.end(), r);
uint64_t expected_checksum = 0;
#if VOLATILE == 0
for (size_t i = 0; i < nloop * 1024; ++i) {
expected_checksum += offsets[i % offsets.size()] % 128;
}
#endif
std::ofstream check_file;
check_file.open("../results/micro_bench/latency/micro_bench_latency_" + (std::string)(VOLATILE == 1 ? "volatile" : "sum") + ".checksum");
check_file << expected_checksum;
check_file.close();
printf("benchmark bufsize=%zuKiB, nloop=%zuKi\n", bufsize/1024, nloop/1024);
std::ofstream out_file;
out_file.open("../results/micro_bench/latency/micro_bench_latency_"+ (std::string)(VOLATILE == 1 ? "volatile" : "sum") + ".csv");
print_to_file(out_file, "tasknode", "memnode", "latency", "checksum");
for (int tasknode = 0; tasknode < 8; tasknode++) {
for (int memnode = 0; memnode < 16; memnode++) {
bench(tasknode, memnode, &out_file);
}
}
return 0;
}

271
qdp_project/src/benchmark/micro_benchmarks.cpp

@ -1,271 +0,0 @@
#include <iostream>
#include <chrono>
#include <future>
#include <numa.h>
#include <algorithm>
#include <cstring>
#include "memory_literals.h"
#include "array_utils.h"
#include "file_output.h"
#include "aggregation.h"
using base_t = uint64_t;
size_t thread_cnt_memcpy = 128;
size_t thread_cnt_read = 128;
size_t runs = 10;
base_t sum_up(base_t* data, size_t workload){
base_t sum = 0;
for(int i = 0; i < workload/sizeof(base_t); i++){
sum += data[i];
}
return sum;
}
int reverse_bits(int number, size_t bit_count) {
int result = 0;
for(int i = 0; i < bit_count; i++) {
result <<= 1;
result |= (number & 1);
number >>= 1;
}
return result;
}
double measure_memcpy_bw(base_t* src, base_t* dest, size_t workload, base_t* result){
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
auto thread_lambda = [&](base_t* source, base_t* destination, size_t count) {
ready_future.wait();
memcpy(destination, source, count);
};
std::vector<std::thread> thread_pool;
size_t total_elements = workload / sizeof(base_t);
size_t elements_per_thread = total_elements / thread_cnt_memcpy;
size_t remainder = total_elements % thread_cnt_memcpy;
for(size_t tid = 0; tid < thread_cnt_memcpy; tid++) {
size_t elements_to_process = elements_per_thread + (tid < remainder ? 1 : 0);
size_t byte_offset = (elements_per_thread * tid + std::min(tid, remainder)) * sizeof(base_t);
thread_pool.emplace_back(thread_lambda, src + byte_offset / sizeof(base_t), dest + byte_offset / sizeof(base_t), elements_to_process * sizeof(base_t));
}
auto start = std::chrono::steady_clock::now();
p.set_value();
for(std::thread& t : thread_pool) { t.join(); }
auto stop = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start);
double seconds = duration.count() / 1e9;
double throughput = (workload / seconds) / (1024 * 1024 * 1024);
*result = sum_up(dest, workload);
return throughput;
}
double measure_read_bw(base_t* data, size_t workload, base_t* results){
const size_t chunk_size = sizeof(__m512i);
const size_t num_chunks = (workload) / chunk_size;
__m512i* src = reinterpret_cast<__m512i*>(data);
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
size_t num_chunks_per_thread = num_chunks / thread_cnt_read;
size_t num_chunks_remainder = num_chunks % thread_cnt_read;
auto thread_lambda = [&](__m512i* src, int tid, int num_chunks) {
__m512i accumulator = _mm512_setzero_si512();
ready_future.wait();
for (int i = 0; i < num_chunks; i++) {
__m512i chunk = _mm512_load_si512(&src[i]);
accumulator = _mm512_add_epi64(accumulator, chunk);
}
results[tid] = _mm512_reduce_add_epi64(accumulator);
};
std::vector<std::thread> thread_pool;
int offset;
for(int tid = 0; tid < thread_cnt_read; tid++){
if(tid < num_chunks_remainder){
offset = tid * (num_chunks_per_thread + 1);
thread_pool.emplace_back(thread_lambda, &src[offset], tid, (num_chunks_per_thread + 1));
} else {
offset = tid*num_chunks_per_thread + num_chunks_remainder;
thread_pool.emplace_back(thread_lambda, &src[offset], tid, num_chunks_per_thread);
}
}
auto start = std::chrono::steady_clock::now();
p.set_value();
for(std::thread& t : thread_pool) { t.join(); }
auto stop = std::chrono::steady_clock::now();
Aggregation<base_t, Sum, load_mode::Aligned>::apply(results, results, sizeof(base_t) * thread_cnt_read);
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start);
double seconds = duration.count() / 1e9;
double throughput = (workload / seconds) / (1024 * 1024 * 1024);
return throughput;
}
void exec_multiple_runs_memcpy(size_t workload, int exec_node, int src_node, int dest_node, std::ofstream* out_file, std::string iteration_type){
base_t value;
base_t* result = &value;
base_t* src = (base_t*) numa_alloc_onnode(workload, src_node);
base_t* dest = (base_t*) numa_alloc_onnode(workload, dest_node);
fill_mt<base_t>(src, workload, 0, 100, 42);
fill_mt<base_t>(dest, workload, 0, 100, 12);
numa_run_on_node(exec_node);
if(dest_node == 0 && src_node == 0){
std::ofstream check_file;
check_file.open("../results/micro_bench/micro_bench_bw_memcpy_execnode_" + std::to_string(exec_node)
+ "_threadcnt_" + std::to_string(thread_cnt_memcpy) + "_" + iteration_type + ".checksum");
check_file << sum_up(src, workload);
check_file.close();
}
for(size_t run = 0; run < runs; run++){
double bw = measure_memcpy_bw(src, dest, workload, result);
std::cout << "Copy throughput executed on node " << exec_node << " form node " << src_node << " to node "
<< dest_node << ": " << bw << " GiB/s" << std::endl;
print_to_file(*out_file, run, src_node, dest_node, bw, *result);
std::memset(dest, 0x00, workload);
*result = 0;
}
numa_free(src, workload);
numa_free(dest, workload);
}
void measure_all_memcpy_bw_for_chosen_execnode(int exec_node){
std::ofstream out_file;
out_file.open("../results/micro_bench/micro_bench_bw_memcpy_execnode_" + std::to_string(exec_node)
+ "_threadcnt_" + std::to_string(thread_cnt_memcpy) + ".csv");
print_to_file(out_file, "run", "src_node", "dest_node", "bw", "result");
const size_t workload = 4_GiB;
for(int src_node = 0; src_node < 16; src_node++){
for(int dest_node = 0; dest_node < 16; dest_node++){
exec_multiple_runs_memcpy(workload, exec_node, src_node, dest_node, &out_file, "");
}
}
out_file.close();
}
void measure_all_memcpy_bw_for_chosen_execnode_reversed(int exec_node){
std::ofstream out_file;
out_file.open("../results/micro_bench/micro_bench_bw_memcpy_execnode_" + std::to_string(exec_node)
+ "_threadcnt_" + std::to_string(thread_cnt_memcpy) + "_reversed.csv");
print_to_file(out_file, "run", "src_node", "dest_node", "bw", "result");
const size_t workload = 4_GiB;
for(int src_node = 15; src_node >= 0; src_node--){
for(int dest_node = 15; dest_node >= 0; dest_node--){
exec_multiple_runs_memcpy(workload, exec_node, src_node, dest_node, &out_file, "reversed");
}
}
out_file.close();
}
void measure_all_memcpy_bw_for_chosen_execnode_reversed_bitwise(int exec_node){
std::ofstream out_file;
out_file.open("../results/micro_bench/micro_bench_bw_memcpy_execnode_" + std::to_string(exec_node)
+ "_threadcnt_" + std::to_string(thread_cnt_memcpy) + "_reversed_bitwise.csv");
print_to_file(out_file, "run", "src_node", "dest_node", "bw", "result");
const size_t workload = 4_GiB;
for(int src_node = 0; src_node < 16; src_node++){
for(int dest_node = 0; dest_node < 16; dest_node++){
int reversed_src_node = reverse_bits(src_node, 4);
int reversed_dest_node = reverse_bits(dest_node, 4);
exec_multiple_runs_memcpy(workload, exec_node, reversed_src_node, reversed_dest_node, &out_file, "reversed_bitwise");
}
}
out_file.close();
}
void exec_multiple_runs_read(size_t workload, int mem_node, int exec_node, std::ofstream *out_file, std::string iteration_type){
base_t* data = (base_t*) numa_alloc_onnode(workload, mem_node);
fill_mt<base_t>(data, workload, 0, 100, 42);
base_t* results = (base_t*) numa_alloc_onnode(thread_cnt_read * sizeof(base_t), exec_node);
numa_run_on_node(exec_node);
if(mem_node == 0 && exec_node == 0){
std::ofstream check_file;
check_file.open("../results/micro_bench/micro_bench_bw_read_threadcnt_" + std::to_string(thread_cnt_read) + "_" + iteration_type + ".checksum");
check_file << sum_up(data, workload);
check_file.close();
}
for(size_t run = 0; run < runs; run++){
double bw = measure_read_bw(data, workload, results);
std::cout << "Read throughput executed on node " << exec_node << " for node " << mem_node << ": " << bw << " GiB/s" << std::endl;
print_to_file(*out_file, run, exec_node, mem_node, bw, results[0]);
std::memset(results, 0x00, thread_cnt_read * sizeof(base_t));
}
numa_free(data, workload);
numa_free(results, thread_cnt_read * sizeof(base_t));
}
void measure_all_read_bw(){
std::ofstream out_file;
out_file.open("../results/micro_bench/micro_bench_bw_read_threadcnt_" + std::to_string(thread_cnt_read) + ".csv");
print_to_file(out_file, "run", "exec_node", "mem_node", "bw", "result");
const size_t workload = 8_GiB;
for(int exec_node = 0; exec_node < 8; exec_node++){
for(int mem_node = 0; mem_node < 16; mem_node++){
exec_multiple_runs_read(workload, mem_node, exec_node, &out_file, "");
}
}
out_file.close();
}
void measure_all_read_bw_reversed(){
std::ofstream out_file;
out_file.open("../results/micro_bench/micro_bench_bw_read_threadcnt_" + std::to_string(thread_cnt_read) + "_reversed.csv");
print_to_file(out_file, "run", "exec_node", "mem_node", "bw", "result");
const size_t workload = 8_GiB;
for(int exec_node = 7; exec_node >= 0; exec_node--){
for(int mem_node = 15; mem_node >= 0; mem_node--){
exec_multiple_runs_read(workload, mem_node, exec_node, &out_file, "reversed");
}
}
out_file.close();
}
void measure_all_read_bw_reversed_bitwise(){
std::ofstream out_file;
out_file.open("../results/micro_bench/micro_bench_bw_read_threadcnt_" + std::to_string(thread_cnt_read) + "_reversed_bitwise.csv");
print_to_file(out_file, "run", "exec_node", "mem_node", "bw", "result");
const size_t workload = 8_GiB;
for(int exec_node0 = 0; exec_node0 < 8; exec_node0++){
for(int mem_node0 = 0; mem_node0 < 16; mem_node0++){
int mem_node = reverse_bits(mem_node0, 4);
int exec_node = reverse_bits(exec_node0, 3);
exec_multiple_runs_read(workload, mem_node, exec_node, &out_file, "reversed_bitwise");
}
}
out_file.close();
}
int main() {
// nodes 0-7 hold cores and DRAM, nodes 8-15 only HBM
measure_all_read_bw_reversed_bitwise();
measure_all_memcpy_bw_for_chosen_execnode_reversed_bitwise(0);
return 0;
}

391
qdp_project/src/benchmark/pipelines/DIMES_scan_filter_pipe.h

@ -1,391 +0,0 @@
#include <cassert>
#include <mutex>
#include <cstring>
#include <bitset>
#include <numa.h>
#include "filter.h"
#include "aggregation.h"
#include "vector_loader.h"
#include "timer_utils.h"
#include "barrier_utils.h"
#include "execution_modes.h"
template<typename base_t, bool simple>
class Query_Wrapper {
public:
// sync
std::shared_future<void>* ready_future;
thread_runtime_timing* trt;
barrier_timing* bt;
private:
// numa
uint32_t close_mem;
uint32_t far_mem;
// data
size_t size_b;
size_t chunk_size_b;
size_t chunk_size_w;
size_t chunk_cnt;
base_t* data_a;
base_t* data_b;
base_t* dest;
// ratios
uint32_t thread_count_fc;
uint32_t thread_count_fi;
uint32_t thread_count_ag;
uint32_t thread_group;
// done bits
volatile uint8_t* ready_flag_a;
volatile uint8_t* ready_flag_b;
std::mutex ready_a_m;
std::mutex ready_b_m;
// buffer
uint16_t* mask_a;
uint16_t* mask_b;
base_t** buffer_b;
// params
base_t cmp_a;
base_t cmp_b;
bool no_copy;
NewPMode mode;
// sync
std::unique_ptr<std::vector<std::barrier<barrier_completion_function>*>> sync_barrier;
std::string barrier_mode = BARRIER_MODE;
using filterCopy = Filter<base_t, LT, load_mode::Stream, true>;
using filterNoCopy = Filter<base_t, LT, load_mode::Stream, false>;
using filter = Filter<base_t, LT, load_mode::Stream, false>;
using aggregation = Aggregation<base_t, Sum, load_mode::Stream>;
public:
Query_Wrapper(std::shared_future<void>* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a,
base_t* data_b, base_t* dest, uint32_t numa_close, uint32_t numa_far, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag,
NewPMode mode, uint32_t thread_group, base_t cmp_a = 50, base_t cmp_b = 42, bool no_copy = false) :
ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b),
dest(dest), close_mem(numa_close), far_mem(numa_far), mode(mode), thread_group(thread_group), cmp_a(cmp_a), cmp_b(cmp_b), no_copy(no_copy){
chunk_size_w = chunk_size_b / sizeof(base_t);
chunk_cnt = size_b / chunk_size_b;
thread_count_fi = tc_fi;
thread_count_fc = tc_fc;
thread_count_ag = tc_ag;
ready_flag_a = (volatile uint8_t *) numa_alloc_onnode(
chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), close_mem);
ready_flag_b = (volatile uint8_t *) numa_alloc_onnode(
chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), close_mem);
mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem);
mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem);
trt = new thread_runtime_timing(4, 16*4*4*4, close_mem);
bt = new barrier_timing(4, 16*4*4*4, close_mem);
reset_barriers();
if constexpr(BUFFER_LIMIT==1) {
// TODO size ok like that?
buffer_b = (base_t**) numa_alloc_onnode(size_b * sizeof(base_t*), close_mem);
buffer_b[0] = (base_t*) numa_alloc_onnode(thread_group * chunk_size_b, close_mem);
buffer_b[1] = (base_t*) numa_alloc_onnode(thread_group * chunk_size_b, close_mem);
} else {
buffer_b = (base_t **) numa_alloc_onnode(sizeof(base_t*), close_mem);
base_t* buffer_tmp = (base_t *) numa_alloc_onnode(size_b, close_mem);
*buffer_b = buffer_tmp;
}
};
void reset_barriers(){
if(sync_barrier != nullptr) {
for(auto& barrier : *sync_barrier) {
delete barrier;
}
sync_barrier.reset();
}
sync_barrier = std::make_unique<std::vector<std::barrier<barrier_completion_function>*>>(thread_group);
uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc;
uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_group;
uint32_t barrier_thread_count;
if constexpr(simple){
barrier_thread_count = (thread_group / barrier_count) *
(mode == NewPMode::Prefetch ? thread_count_sum : (thread_count_ag + thread_count_fi));
} else {
barrier_thread_count = (thread_group / barrier_count) * thread_count_sum;
}
for(uint32_t i = 0; i < barrier_count; ++i) {
(*sync_barrier)[i] = new std::barrier<barrier_completion_function>(barrier_thread_count);
}
}
void clear_buffers () {
std::memset((void*)ready_flag_a, 0x00, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0));
std::memset((void*)ready_flag_b, 0x00, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0));
std::memset(mask_a, 0x00, size_b / sizeof(base_t));
std::memset(mask_b, 0x00, size_b / sizeof(base_t));
if constexpr(BUFFER_LIMIT==1) {
std::memset(buffer_b[0], 0x00, thread_group * chunk_size_b);
std::memset(buffer_b[1], 0x00, thread_group * chunk_size_b);
} else {
std::memset(*buffer_b, 0x00, size_b);
}
trt->reset_accumulator();
bt->reset_accumulator();
reset_barriers();
};
~Query_Wrapper() {
numa_free((void*)ready_flag_a,
chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0));
numa_free((void*)ready_flag_b,
chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0));
numa_free(mask_a, size_b / sizeof(base_t));
numa_free(mask_b, size_b / sizeof(base_t));
if constexpr(BUFFER_LIMIT==1) {
numa_free(buffer_b[0], thread_group * chunk_size_b);
numa_free(buffer_b[1], thread_group * chunk_size_b);
numa_free(buffer_b, size_b * sizeof(base_t*));
} else {
numa_free(*buffer_b, size_b);
}
delete trt;
for(auto& barrier : *sync_barrier) {
delete barrier;
}
delete bt;
};
//this can be set without need to change allocations
void set_thread_group_count(uint32_t value) {
this->thread_group = value;
};
private:
static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid,
size_t tcnt) {
base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w;
return chunk_ptr + tid * (chunk_size_w / tcnt);
}
static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid,
size_t tcnt) {
// 16 integer are addressed with one uint16_t in mask buffer
size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt);
return base_ptr + (offset / 16);
}
static bool bit_at(volatile uint8_t* bitmap, uint32_t bitpos) {
uint8_t value = bitmap[bitpos / 8];
switch(bitpos % 8) {
case 0: return value & 0b00000001;
case 1: return value & 0b00000010;
case 2: return value & 0b00000100;
case 3: return value & 0b00001000;
case 4: return value & 0b00010000;
case 5: return value & 0b00100000;
case 6: return value & 0b01000000;
case 7: return value & 0b10000000;
default: return false;
}
}
static void set_bit_at(volatile uint8_t* bitmap, std::mutex& mutex, uint32_t bitpos) {
mutex.lock();
switch(bitpos % 8) {
case 0: bitmap[bitpos / 8] |= 0b00000001;break;
case 1: bitmap[bitpos / 8] |= 0b00000010;break;
case 2: bitmap[bitpos / 8] |= 0b00000100;break;
case 3: bitmap[bitpos / 8] |= 0b00001000;break;
case 4: bitmap[bitpos / 8] |= 0b00010000;break;
case 5: bitmap[bitpos / 8] |= 0b00100000;break;
case 6: bitmap[bitpos / 8] |= 0b01000000;break;
case 7: bitmap[bitpos / 8] |= 0b10000000;break;
}
mutex.unlock();
}
public:
static base_t checksum(base_t* a, base_t* b, base_t cmp_a, base_t cmp_b, size_t size_b) {
base_t sum = 0;
for(int i = 0; i < size_b / sizeof(base_t); ++i) {
if(a[i] >= cmp_a && b[i] <= cmp_b) {
sum += b[i];
}
}
return sum;
}
static void checkmask(uint16_t* mask, base_t cmp, base_t* data, size_t size_b, bool leq) {
uint32_t cnt = 0;
for(int i = 0; i < size_b / sizeof(base_t); ++i) {
if(leq) {
if(((data[i] <= cmp) != bit_at((uint8_t*)mask, i))) {
++cnt;
}
} else {
if(((data[i] >= cmp) != bit_at((uint8_t*)mask, i))) {
++cnt;
}
}
}
}
static void checkmask_16(uint16_t* mask, base_t cmp, base_t* data, size_t size_b, bool leq) {
for(int i = 0; i < size_b / sizeof(base_t) / 16 ; ++i) {
std::bitset<16> m(mask[i]);
uint16_t ch = 0;
for(int j = 0; j < 16; ++j) {
if(data[i*16 + j] <= cmp) {
ch |= 0x1 << j;
}
}
std::bitset<16> c(ch);
std::cout << "act " << m << std::endl;
std::cout << "rea " << c << std::endl << std::endl;
}
}
void scan_b(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_fc;
assert(chunk_size_w % tcnt == 0);
assert(chunk_size_w % 16 == 0);
assert(chunk_size_w % tcnt * 16 == 0);
// wait till everyone can start
ready_future->wait();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(1, tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b , chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr = get_sub_mask_ptr (mask_b , chunk_id, chunk_size_w, tid, tcnt);
if constexpr(simple){
base_t* buffer_ptr;
if constexpr(BUFFER_LIMIT==1) {
buffer_ptr = get_sub_chunk_ptr(buffer_b[i % 2], gid, chunk_size_w, tid, tcnt);
} else {
buffer_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt);
}
std::memcpy(buffer_ptr, chunk_ptr, chunk_size_b / tcnt);
} else {
if(no_copy) {
filterNoCopy::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_b, chunk_size_b / tcnt);
} else {
base_t* buffer_ptr;
if constexpr(BUFFER_LIMIT==1) {
buffer_ptr = get_sub_chunk_ptr(buffer_b[i % 2], gid, chunk_size_w, tid, tcnt);
} else {
buffer_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt);
}
filterCopy::apply_same(mask_ptr, buffer_ptr, chunk_ptr, cmp_b, chunk_size_b / tcnt);
}
}
trt->stop_timer(1, tid * gcnt + gid);
bt->timed_wait(*(*sync_barrier)[barrier_idx], 1, tid * gcnt + gid);
}
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void scan_a(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_fi;
assert(chunk_size_w % tcnt == 0);
assert(chunk_size_w % 16 == 0);
assert(chunk_size_w % tcnt * 16 == 0);
// wait till everyone can start
ready_future->wait();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(0, tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt);
trt->stop_timer(0, tid * gcnt + gid);
bt->timed_wait(*(*sync_barrier)[barrier_idx], 0, tid * gcnt + gid);
}
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void aggr_j(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_ag;
// wait till everyone can start
ready_future->wait();
// calculate values
__m512i aggregator = aggregation::OP::zero();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid);
trt->start_timer(2, tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr;
if(no_copy) {
chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
} else {
if constexpr(BUFFER_LIMIT==1) {
chunk_ptr = get_sub_chunk_ptr(buffer_b[i % 2], gid, chunk_size_w, tid, tcnt);
} else {
chunk_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt);
}
}
uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr_b = get_sub_mask_ptr (mask_b, chunk_id, chunk_size_w, tid, tcnt);
base_t tmp = _mm512_reduce_add_epi64(aggregator);
if constexpr(simple){
aggregator = aggregation::apply_masked(aggregator, chunk_ptr, mask_ptr_a, chunk_size_b / tcnt);
} else {
aggregator = aggregation::apply_masked(aggregator, chunk_ptr, mask_ptr_a, mask_ptr_b, chunk_size_b / tcnt);
}
trt->stop_timer(2, tid * gcnt + gid);
}
// so threads with more runs dont wait for finished threads
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
aggregation::happly(dest + (tid * gcnt + gid), aggregator);
}
};

199
qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h

@ -15,9 +15,9 @@
#include "measurement_utils.h" #include "measurement_utils.h"
#include "execution_modes.h" #include "execution_modes.h"
#include "../../../thirdParty/dsa_offload/offloading-cacher/cache.hpp"
#include "../../../../offloading-cacher/cache.hpp"
template<typename base_t, bool simple>
template<typename base_t, bool simple, bool cache_a, bool wait_b>
class Query_Wrapper { class Query_Wrapper {
public: public:
// sync // sync
@ -28,11 +28,9 @@ public:
pcm_value_collector* pvc; pcm_value_collector* pvc;
private: private:
dsacache::Cache cache_;
static constexpr size_t COPY_POLICY_MIN_SIZE = 64 * 1024 * 1024;
// numa
uint32_t close_mem;
uint32_t far_mem;
dsacache::Cache cache_;
// data // data
size_t size_b; size_t size_b;
@ -47,13 +45,11 @@ private:
uint32_t thread_count_fc; uint32_t thread_count_fc;
uint32_t thread_count_fi; uint32_t thread_count_fi;
uint32_t thread_count_ag; uint32_t thread_count_ag;
uint32_t thread_group;
uint32_t thread_count;
// done bits // done bits
volatile uint8_t* ready_flag_a; volatile uint8_t* ready_flag_a;
volatile uint8_t* ready_flag_b; volatile uint8_t* ready_flag_b;
std::mutex ready_a_m;
std::mutex ready_b_m;
// buffer // buffer
uint16_t* mask_a; uint16_t* mask_a;
@ -73,70 +69,72 @@ private:
using filter = Filter<base_t, LT, load_mode::Stream, false>; using filter = Filter<base_t, LT, load_mode::Stream, false>;
using aggregation = Aggregation<base_t, Sum, load_mode::Stream>; using aggregation = Aggregation<base_t, Sum, load_mode::Stream>;
void InitCache(const std::string& device) {
if (device == "default") {
static const auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node;
};
static int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
}
static const auto copy_policy = [](const int numa_dst_node, const int numa_src_node) {
return std::vector<int>{ numa_src_node, numa_dst_node };
};
static std::vector<int> CopyMethodPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
if (data_size < COPY_POLICY_MIN_SIZE) {
// if the data size is small then the copy will just be carried
// out by the destination node which does not require setting numa
// thread affinity as the selected dsa engine is already the one
// present on the calling thread
cache_.Init(cache_policy,copy_policy);
}
else if (device == "xeonmax") {
static const auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
};
static const auto copy_policy = [](const int numa_dst_node, const int numa_src_node) {
const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0;
if (same_socket) {
const bool socket_number = numa_dst_node >> 2;
if (socket_number == 0) return std::vector<int>{ 0, 1, 2, 3 };
else return std::vector<int>{ 4, 5, 6, 7 };
}
else return std::vector<int>{ numa_src_node, numa_dst_node };
};
cache_.Init(cache_policy,copy_policy);
return std::vector<int>{ (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) };
} }
else { else {
std::cerr << "Given device '" << device << "' not supported!" << std::endl;
exit(-1);
// for sufficiently large data, smart copy is used which will utilize
// all four engines for intra-socket copy operations and cross copy on
// the source and destination nodes for inter-socket copy
const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0;
if (same_socket) {
const bool socket_number = numa_dst_node >> 2;
if (socket_number == 0) return std::vector<int>{ 0, 1, 2, 3 };
else return std::vector<int>{ 4, 5, 6, 7 };
}
else {
return std::vector<int>{
(numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node),
(numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node)
};
}
} }
} }
public: public:
Query_Wrapper(std::shared_future<void>* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a,
base_t* data_b, base_t* dest, uint32_t numa_close, uint32_t numa_far, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag,
NewPMode mode, uint32_t thread_group, base_t cmp_a = 50, base_t cmp_b = 42) :
Query_Wrapper(std::shared_future<void>* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a,
base_t* data_b, base_t* dest, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag,
NewPMode mode, base_t cmp_a = 50, base_t cmp_b = 42) :
ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b), ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b),
dest(dest), close_mem(numa_close), far_mem(numa_far), mode(mode), thread_group(thread_group), cmp_a(cmp_a), cmp_b(cmp_b){
dest(dest), mode(mode), cmp_a(cmp_a), cmp_b(cmp_b) {
const int current_cpu = sched_getcpu();
const int current_node = numa_node_of_cpu(current_cpu);
const int cache_node = CachePlacementPolicy(current_node, current_node, 0);
chunk_size_w = chunk_size_b / sizeof(base_t); chunk_size_w = chunk_size_b / sizeof(base_t);
chunk_cnt = size_b / chunk_size_b; chunk_cnt = size_b / chunk_size_b;
thread_count_fi = tc_fi; thread_count_fi = tc_fi;
thread_count_fc = tc_fc; thread_count_fc = tc_fc;
thread_count_ag = tc_ag; thread_count_ag = tc_ag;
ready_flag_a = (volatile uint8_t *) numa_alloc_onnode(
chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), close_mem);
ready_flag_b = (volatile uint8_t *) numa_alloc_onnode(
chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), close_mem);
thread_count = tc_fi + tc_fc + tc_ag;
ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), cache_node);
ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), cache_node);
mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem);
mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem);
mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node);
mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node);
InitCache("xeonmax");
cache_.Init(CachePlacementPolicy, CopyMethodPolicy);
size_t measurement_space = THREAD_GROUP_MULTIPLIER * std::max(std::max(tc_fi, tc_fc), tc_ag);
trt = new thread_runtime_timing(3, measurement_space, far_mem);
bt = new barrier_timing(3, measurement_space, far_mem);
pvc = new pcm_value_collector({"scan_a", "scan_b", "aggr_j"}, measurement_space, far_mem);
size_t measurement_space = std::max(std::max(tc_fi, tc_fc), tc_ag);
trt = new thread_runtime_timing(3, measurement_space, current_node);
bt = new barrier_timing(3, measurement_space, current_node);
pvc = new pcm_value_collector({"scan_a", "scan_b", "aggr_j"}, measurement_space, current_node);
reset_barriers(); reset_barriers();
}; };
@ -148,16 +146,15 @@ public:
sync_barrier.reset(); sync_barrier.reset();
} }
sync_barrier = std::make_unique<std::vector<std::barrier<barrier_completion_function>*>>(thread_group);
sync_barrier = std::make_unique<std::vector<std::barrier<barrier_completion_function>*>>(thread_count);
uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc; uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc;
uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_group;
uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_count;
uint32_t barrier_thread_count; uint32_t barrier_thread_count;
if constexpr(simple){ if constexpr(simple){
barrier_thread_count = (thread_group / barrier_count) *
(mode == NewPMode::Prefetch ? thread_count_sum : (thread_count_ag + thread_count_fi));
barrier_thread_count = (thread_count / barrier_count) * (mode == NewPMode::Prefetch ? thread_count_sum : (thread_count_ag + thread_count_fi));
} else { } else {
barrier_thread_count = (thread_group / barrier_count) * thread_count_sum;
barrier_thread_count = (thread_count / barrier_count) * thread_count_sum;
} }
for(uint32_t i = 0; i < barrier_count; ++i) { for(uint32_t i = 0; i < barrier_count; ++i) {
(*sync_barrier)[i] = new std::barrier<barrier_completion_function>(barrier_thread_count); (*sync_barrier)[i] = new std::barrier<barrier_completion_function>(barrier_thread_count);
@ -180,10 +177,8 @@ public:
}; };
~Query_Wrapper() { ~Query_Wrapper() {
numa_free((void*)ready_flag_a,
chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0));
numa_free((void*)ready_flag_b,
chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0));
numa_free((void*)ready_flag_a, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0));
numa_free((void*)ready_flag_b, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0));
numa_free(mask_a, size_b / sizeof(base_t)); numa_free(mask_a, size_b / sizeof(base_t));
numa_free(mask_b, size_b / sizeof(base_t)); numa_free(mask_b, size_b / sizeof(base_t));
@ -202,14 +197,12 @@ public:
}; };
private: private:
static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid,
size_t tcnt) {
static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) {
base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w; base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w;
return chunk_ptr + tid * (chunk_size_w / tcnt); return chunk_ptr + tid * (chunk_size_w / tcnt);
} }
static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid,
size_t tcnt) {
static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) {
// 16 integer are addressed with one uint16_t in mask buffer // 16 integer are addressed with one uint16_t in mask buffer
size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt); size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt);
return base_ptr + (offset / 16); return base_ptr + (offset / 16);
@ -258,6 +251,7 @@ public:
// the lower gids run once more if the chunks are not evenly distributable // the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) { for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(1, tid * gcnt + gid); trt->start_timer(1, tid * gcnt + gid);
pvc->start("scan_b", tid * gcnt + gid); pvc->start("scan_b", tid * gcnt + gid);
@ -268,28 +262,45 @@ public:
uint16_t* mask_ptr = get_sub_mask_ptr(mask_b, chunk_id, chunk_size_w, tid, tcnt); uint16_t* mask_ptr = get_sub_mask_ptr(mask_b, chunk_id, chunk_size_w, tid, tcnt);
if constexpr(simple){ if constexpr(simple){
cache_.Access(chunk_ptr, chunk_size_b / tcnt);
cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
} else { } else {
const auto data = cache_.Access(chunk_ptr, chunk_size_b / tcnt);
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
// wait on copy to complete - during this time other threads may
// continue with their calculation which leads to little impact
// and we will be faster if the cache is used
if constexpr(wait_b) {
// wait on copy to complete - during this time other threads may
// continue with their calculation which leads to little impact
// and we will be faster if the cache is used
data->WaitOnCompletion();
data->WaitOnCompletion();
// obtain the data location from the cache entry
// obtain the data location from the cache entry
base_t* data_ptr = data->GetDataLocation();
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked
// nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked
if (data_ptr == nullptr) {
data_ptr = chunk_ptr;
if (data_ptr == nullptr) {
std::cerr << "[!] Cache Miss in ScanB" << std::endl;
data_ptr = chunk_ptr;
}
filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt);
} }
else {
// obtain the data location from the cache entry
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked
filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt);
if (data_ptr == nullptr) {
data_ptr = chunk_ptr;
}
filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt);
}
} }
pvc->stop("scan_b", tid * gcnt + gid); pvc->stop("scan_b", tid * gcnt + gid);
@ -321,7 +332,21 @@ public:
base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt); base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt);
if constexpr (cache_a) {
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
data->WaitOnCompletion();
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
if (data_ptr == nullptr) {
std::cerr << "[!] Cache Miss in ScanA" << std::endl;
data_ptr = chunk_ptr;
}
filter::apply_same(mask_ptr, nullptr, data_ptr, cmp_a, chunk_size_b / tcnt);
}
else {
filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt);
}
pvc->stop("scan_a", tid * gcnt + gid); pvc->stop("scan_a", tid * gcnt + gid);
trt->stop_timer(0, tid * gcnt + gid); trt->stop_timer(0, tid * gcnt + gid);
@ -340,19 +365,19 @@ public:
// the lower gids run once more if the chunks are not evenly distributable // the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) { for(uint32_t i = 0; i < runs; ++i) {
bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid); bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid);
trt->start_timer(2, tid * gcnt + gid); trt->start_timer(2, tid * gcnt + gid);
pvc->start("aggr_j", tid * gcnt + gid); pvc->start("aggr_j", tid * gcnt + gid);
// calculate pointers // calculate pointers
size_t chunk_id = gid + gcnt * i; size_t chunk_id = gid + gcnt * i;
const base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
// access the cache for the given chunk which will have been accessed in scan_b // access the cache for the given chunk which will have been accessed in scan_b
const auto data = cache_.Access(chunk_ptr, chunk_size_b / tcnt);
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
// wait on the caching task to complete, this will give time for other processes // wait on the caching task to complete, this will give time for other processes
// to make progress here which will therefore not hurt performance // to make progress here which will therefore not hurt performance
@ -362,14 +387,14 @@ public:
// after the copy task has finished we obtain the pointer to the cached // after the copy task has finished we obtain the pointer to the cached
// copy of data_b which is then used from now on // copy of data_b which is then used from now on
const base_t* data_ptr = data->GetDataLocation();
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// nullptr is still a legal return value for CacheData::GetLocation() // nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked // even after waiting, so this must be checked
if (data_ptr == nullptr) { if (data_ptr == nullptr) {
data_ptr = chunk_ptr; data_ptr = chunk_ptr;
std::cerr << "Cache Miss" << std::endl;
std::cerr << "[!] Cache Miss in AggrJ" << std::endl;
} }
uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);

387
qdp_project/src/benchmark/pipelines/scan_filter_pipe.h

@ -1,387 +0,0 @@
#include <cassert>
#include <mutex>
#include <cstring>
#include <bitset>
#include <numa.h>
#include "filter.h"
#include "aggregation.h"
#include "vector_loader.h"
#include "timer_utils.h"
#include "barrier_utils.h"
#include "execution_modes.h"
template<typename base_t, bool simple>
class Query_Wrapper {
public:
// sync
std::shared_future<void>* ready_future;
thread_runtime_timing* trt;
barrier_timing* bt;
private:
// numa
uint32_t close_mem;
uint32_t far_mem;
// data
size_t size_b;
size_t chunk_size_b;
size_t chunk_size_w;
size_t chunk_cnt;
base_t* data_a;
base_t* data_b;
base_t* dest;
// ratios
uint32_t thread_count_fc;
uint32_t thread_count_fi;
uint32_t thread_count_ag;
uint32_t thread_group;
// done bits
volatile uint8_t* ready_flag_a;
volatile uint8_t* ready_flag_b;
std::mutex ready_a_m;
std::mutex ready_b_m;
// buffer
uint16_t* mask_a;
uint16_t* mask_b;
base_t** buffer_b;
// params
base_t cmp_a;
base_t cmp_b;
bool no_copy;
PMode mode;
// sync
std::unique_ptr<std::vector<std::barrier<barrier_completion_function>*>> sync_barrier;
std::string barrier_mode = BARRIER_MODE;
using filterCopy = Filter<base_t, LEQ, load_mode::Aligned, true>;
using filterNoCopy = Filter<base_t, LEQ, load_mode::Aligned, false>;
using filter = Filter<base_t, GEQ, load_mode::Aligned, false>;
using aggregation = Aggregation<base_t, Sum, load_mode::Aligned>;
public:
Query_Wrapper(std::shared_future<void>* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a,
base_t* data_b, base_t* dest, uint32_t numa_close, uint32_t numa_far, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag,
PMode mode, uint32_t thread_group, base_t cmp_a = 50, base_t cmp_b = 42, bool no_copy = false) :
ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b),
dest(dest), close_mem(numa_close), far_mem(numa_far), mode(mode), thread_group(thread_group), cmp_a(cmp_a), cmp_b(cmp_b), no_copy(no_copy){
chunk_size_w = chunk_size_b / sizeof(base_t);
chunk_cnt = size_b / chunk_size_b;
thread_count_fi = tc_fi;
thread_count_fc = tc_fc;
thread_count_ag = tc_ag;
ready_flag_a = (volatile uint8_t *) numa_alloc_onnode(
chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), close_mem);
ready_flag_b = (volatile uint8_t *) numa_alloc_onnode(
chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), close_mem);
mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem);
mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem);
trt = new thread_runtime_timing(4, 20, close_mem);
bt = new barrier_timing(4, 20, close_mem);
reset_barriers();
if constexpr(BUFFER_LIMIT==1) {
// TODO size ok like that?
buffer_b = (base_t**) numa_alloc_onnode(size_b * sizeof(base_t*), close_mem);
buffer_b[0] = (base_t*) numa_alloc_onnode(thread_group * chunk_size_b, close_mem);
buffer_b[1] = (base_t*) numa_alloc_onnode(thread_group * chunk_size_b, close_mem);
} else {
buffer_b = (base_t **) numa_alloc_onnode(sizeof(base_t*), close_mem);
base_t* buffer_tmp = (base_t *) numa_alloc_onnode(size_b, close_mem);
*buffer_b = buffer_tmp;
}
};
void reset_barriers(){
if(sync_barrier != nullptr) {
for(auto& barrier : *sync_barrier) {
delete barrier;
}
sync_barrier.reset();
}
sync_barrier = std::make_unique<std::vector<std::barrier<barrier_completion_function>*>>(thread_group);
uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc;
uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_group;
uint32_t barrier_thread_count;
if constexpr(simple){
barrier_thread_count = (thread_group / barrier_count) *
(mode == PMode::expl_copy ? thread_count_sum : (thread_count_ag + thread_count_fi));
} else {
barrier_thread_count = (thread_group / barrier_count) * thread_count_sum;
}
for(uint32_t i = 0; i < barrier_count; ++i) {
(*sync_barrier)[i] = new std::barrier<barrier_completion_function>(barrier_thread_count);
}
}
void clear_buffers () {
std::memset((void*)ready_flag_a, 0x00, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0));
std::memset((void*)ready_flag_b, 0x00, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0));
std::memset(mask_a, 0x00, size_b / sizeof(base_t));
std::memset(mask_b, 0x00, size_b / sizeof(base_t));
if constexpr(BUFFER_LIMIT==1) {
std::memset(buffer_b[0], 0x00, thread_group * chunk_size_b);
std::memset(buffer_b[1], 0x00, thread_group * chunk_size_b);
} else {
std::memset(*buffer_b, 0x00, size_b);
}
trt->reset_accumulator();
bt->reset_accumulator();
reset_barriers();
};
~Query_Wrapper() {
numa_free((void*)ready_flag_a,
chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0));
numa_free((void*)ready_flag_b,
chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0));
numa_free(mask_a, size_b / sizeof(base_t));
numa_free(mask_b, size_b / sizeof(base_t));
if constexpr(BUFFER_LIMIT==1) {
numa_free(buffer_b[0], thread_group * chunk_size_b);
numa_free(buffer_b[1], thread_group * chunk_size_b);
numa_free(buffer_b, size_b * sizeof(base_t*));
} else {
numa_free(*buffer_b, size_b);
}
delete trt;
for(auto& barrier : *sync_barrier) {
delete barrier;
}
delete bt;
};
private:
static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid,
size_t tcnt) {
base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w;
return chunk_ptr + tid * (chunk_size_w / tcnt);
}
static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid,
size_t tcnt) {
// 16 integer are addressed with one uint16_t in mask buffer
size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt);
return base_ptr + (offset / 16);
}
static bool bit_at(volatile uint8_t* bitmap, uint32_t bitpos) {
uint8_t value = bitmap[bitpos / 8];
switch(bitpos % 8) {
case 0: return value & 0b00000001;
case 1: return value & 0b00000010;
case 2: return value & 0b00000100;
case 3: return value & 0b00001000;
case 4: return value & 0b00010000;
case 5: return value & 0b00100000;
case 6: return value & 0b01000000;
case 7: return value & 0b10000000;
default: return false;
}
}
static void set_bit_at(volatile uint8_t* bitmap, std::mutex& mutex, uint32_t bitpos) {
mutex.lock();
switch(bitpos % 8) {
case 0: bitmap[bitpos / 8] |= 0b00000001;break;
case 1: bitmap[bitpos / 8] |= 0b00000010;break;
case 2: bitmap[bitpos / 8] |= 0b00000100;break;
case 3: bitmap[bitpos / 8] |= 0b00001000;break;
case 4: bitmap[bitpos / 8] |= 0b00010000;break;
case 5: bitmap[bitpos / 8] |= 0b00100000;break;
case 6: bitmap[bitpos / 8] |= 0b01000000;break;
case 7: bitmap[bitpos / 8] |= 0b10000000;break;
}
mutex.unlock();
}
public:
static base_t checksum(base_t* a, base_t* b, base_t cmp_a, base_t cmp_b, size_t size_b) {
base_t sum = 0;
for(int i = 0; i < size_b / sizeof(base_t); ++i) {
if(a[i] >= cmp_a && b[i] <= cmp_b) {
sum += b[i];
}
}
return sum;
}
static void checkmask(uint16_t* mask, base_t cmp, base_t* data, size_t size_b, bool leq) {
uint32_t cnt = 0;
for(int i = 0; i < size_b / sizeof(base_t); ++i) {
if(leq) {
if(((data[i] <= cmp) != bit_at((uint8_t*)mask, i))) {
++cnt;
}
} else {
if(((data[i] >= cmp) != bit_at((uint8_t*)mask, i))) {
++cnt;
}
}
}
}
static void checkmask_16(uint16_t* mask, base_t cmp, base_t* data, size_t size_b, bool leq) {
for(int i = 0; i < size_b / sizeof(base_t) / 16 ; ++i) {
std::bitset<16> m(mask[i]);
uint16_t ch = 0;
for(int j = 0; j < 16; ++j) {
if(data[i*16 + j] <= cmp) {
ch |= 0x1 << j;
}
}
std::bitset<16> c(ch);
std::cout << "act " << m << std::endl;
std::cout << "rea " << c << std::endl << std::endl;
}
}
void scan_b(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_fc;
assert(chunk_size_w % tcnt == 0);
assert(chunk_size_w % 16 == 0);
assert(chunk_size_w % tcnt * 16 == 0);
// wait till everyone can start
ready_future->wait();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(1, tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b , chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr = get_sub_mask_ptr (mask_b , chunk_id, chunk_size_w, tid, tcnt);
if constexpr(simple){
base_t* buffer_ptr;
if constexpr(BUFFER_LIMIT==1) {
buffer_ptr = get_sub_chunk_ptr(buffer_b[i % 2], gid, chunk_size_w, tid, tcnt);
} else {
buffer_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt);
}
std::memcpy(buffer_ptr, chunk_ptr, chunk_size_b / tcnt);
} else {
if(no_copy) {
filterNoCopy::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_b, chunk_size_b / tcnt);
} else {
base_t* buffer_ptr;
if constexpr(BUFFER_LIMIT==1) {
buffer_ptr = get_sub_chunk_ptr(buffer_b[i % 2], gid, chunk_size_w, tid, tcnt);
} else {
buffer_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt);
}
filterCopy::apply_same(mask_ptr, buffer_ptr, chunk_ptr, cmp_b, chunk_size_b / tcnt);
}
}
trt->stop_timer(1, tid * gcnt + gid);
bt->timed_wait(*(*sync_barrier)[barrier_idx], 1, tid * gcnt + gid);
}
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void scan_a(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_fi;
assert(chunk_size_w % tcnt == 0);
assert(chunk_size_w % 16 == 0);
assert(chunk_size_w % tcnt * 16 == 0);
// wait till everyone can start
ready_future->wait();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(0, tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt);
trt->stop_timer(0, tid * gcnt + gid);
bt->timed_wait(*(*sync_barrier)[barrier_idx], 0, tid * gcnt + gid);
}
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void aggr_j(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_ag;
// wait till everyone can start
ready_future->wait();
// calculate values
__m512i aggregator = aggregation::OP::zero();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid);
trt->start_timer(2, tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr;
if(no_copy) {
chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
} else {
if constexpr(BUFFER_LIMIT==1) {
chunk_ptr = get_sub_chunk_ptr(buffer_b[i%2], gid, chunk_size_w, tid, tcnt);
} else {
chunk_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt);
}
}
uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr_b = get_sub_mask_ptr (mask_b, chunk_id, chunk_size_w, tid, tcnt);
base_t tmp = _mm512_reduce_add_epi64(aggregator);
if constexpr(simple){
aggregator = aggregation::apply_masked(aggregator, chunk_ptr, mask_ptr_a, chunk_size_b / tcnt);
} else {
aggregator = aggregation::apply_masked(aggregator, chunk_ptr, mask_ptr_a, mask_ptr_b, chunk_size_b / tcnt);
}
trt->stop_timer(2, tid * gcnt + gid);
}
// so threads with more runs dont wait for finished threads
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
aggregation::happly(dest + (tid * gcnt + gid), aggregator);
}
};

41
qdp_project/src/utils/execution_modes.h

@ -55,17 +55,24 @@ struct new_mode_manager {
};*/ };*/
constexpr static int thread_counts[2][4][3] = { constexpr static int thread_counts[2][4][3] = {
// thread counts for both simple and complex querry
// inner layout: { scan_a, scan_b, aggr_j }
//simple query //simple query
//scan_a, scan_b, aggr_j
{{4, 0, 2}, // DRAM_base
{4, 0, 2}, // HBM_base
{4, 0, 2}, // Mixed_base
{1, 4, 1}},// Prefetching
{
{4, 0, 2}, // DRAM_base
{4, 0, 2}, // HBM_base
{4, 0, 2}, // Mixed_base
{4, 4, 4} // Prefetching
},
//complex query //complex query
{{1, 4, 1}, // DRAM_base
{1, 4, 1}, // HBM_base
{1, 4, 1}, // Mixed_base
{1, 4, 1}},// Prefetching
{
{1, 4, 1}, // DRAM_base
{1, 4, 1}, // HBM_base
{1, 4, 1}, // Mixed_base
{4, 4, 4} // Prefetching
}
}; };
static inline NewPMode inc(NewPMode value) { static inline NewPMode inc(NewPMode value) {
@ -81,9 +88,17 @@ struct new_mode_manager {
}; };
static std::string string(NewPMode value) { static std::string string(NewPMode value) {
switch(value) { switch(value) {
case DRAM_base: return "DRAM_Baseline";
case HBM_base: return "HBM_Baseline";
case Mixed_base: return "DRAM_HBM_Baseline";
} return "Q-d_Prefetching";
case DRAM_base:
return "DRAM_Baseline";
case HBM_base:
return "HBM_Baseline";
case Mixed_base:
return "DRAM_HBM_Baseline";
case Prefetch:
return "Q-d_Prefetching";
default:
std::cerr << "[x] Unknown Processing Mode" << std::endl;
exit(-1);
}
}; };
}; };
Loading…
Cancel
Save