Browse Source

try to fix the barriers causing lock

master
Constantin Fürst 11 months ago
parent
commit
6a90fd6c5e
  1. 2
      qdp_project/CMakeLists.txt
  2. 13
      qdp_project/src/benchmark/MAX_benchmark.cpp
  3. 21
      qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h
  4. 104
      qdp_project/src/utils/execution_modes.h

2
qdp_project/CMakeLists.txt

@ -59,7 +59,7 @@ add_definitions(-DBUFFER_LIMIT=$<STREQUAL:${BUFFER_LIMIT},limited>)
eval(QUERY "simple;complex" "simple") eval(QUERY "simple;complex" "simple")
add_definitions(-DQUERY=$<STREQUAL:${QUERY},simple>) add_definitions(-DQUERY=$<STREQUAL:${QUERY},simple>)
eval(THREAD_FACTOR "1;2;3;4;5;6;7;8;9;10" "4")
eval(THREAD_FACTOR "1;2;3;4;5;6;7;8;9;10" "1")
add_definitions(-DTHREAD_GROUP_MULTIPLIER=${THREAD_FACTOR}) add_definitions(-DTHREAD_GROUP_MULTIPLIER=${THREAD_FACTOR})
# build directory # build directory

13
qdp_project/src/benchmark/MAX_benchmark.cpp

@ -126,7 +126,6 @@ int main(int argc, char** argv) {
// set benchmark parameter // set benchmark parameter
Linear_Int_Range<uint32_t, 0, 30, 1> run("run"); Linear_Int_Range<uint32_t, 0, 30, 1> run("run");
Linear_Int_Range<size_t, chunk_min, chunk_max, chunk_incr> chunk_size("chunk_size"); Linear_Int_Range<size_t, chunk_min, chunk_max, chunk_incr> chunk_size("chunk_size");
Range<NewPMode, DRAM_base, new_mode_manager, new_mode_manager> mode("mode");
print_to_file(out_file, generateHead(run, chunk_size, mode), "thread_group", "time", print_to_file(out_file, generateHead(run, chunk_size, mode), "thread_group", "time",
#ifdef THREAD_TIMINGS #ifdef THREAD_TIMINGS
@ -176,9 +175,9 @@ int main(int argc, char** argv) {
std::promise<void> p; std::promise<void> p;
std::shared_future<void> ready_future(p.get_future()); std::shared_future<void> ready_future(p.get_future());
uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A);
uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B);
uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J);
const uint8_t tc_filter = 4;
const uint8_t tc_copy = 1;
const uint8_t tc_agg = 2;
Query_Wrapper<base_t, simple_query, cache_a, wait_b> qw ( Query_Wrapper<base_t, simple_query, cache_a, wait_b> qw (
&ready_future, workload_b, chunk_size.current, &ready_future, workload_b, chunk_size.current,
@ -197,10 +196,10 @@ int main(int argc, char** argv) {
std::vector<std::thread> copy_pool; std::vector<std::thread> copy_pool;
std::vector<std::thread> agg_pool; std::vector<std::thread> agg_pool;
int thread_id = 0;
// int thread_id = 0;
// std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(28, 42), std::make_pair(84, 98)}; // node 2 heacboehm II // std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(28, 42), std::make_pair(84, 98)}; // node 2 heacboehm II
//std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(32, 48), std::make_pair(96, 112)}; // node 2 heacboehm //std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(32, 48), std::make_pair(96, 112)}; // node 2 heacboehm
std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(24, 36), std::make_pair(120, 132)}; // node 2 sapphire rapids
// std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(24, 36), std::make_pair(120, 132)}; // node 2 sapphire rapids
//std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(24, 48)}; // node 2+3 sapphire rapids //std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(24, 48)}; // node 2+3 sapphire rapids
//std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(0, 48)}; // node 0-3 sapphire rapids //std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(0, 48)}; // node 0-3 sapphire rapids
@ -234,7 +233,7 @@ int main(int argc, char** argv) {
double seconds = (double)(nanos) / nanos_per_second; double seconds = (double)(nanos) / nanos_per_second;
if (i >= 5) { if (i >= 5) {
print_to_file(out_file, run, chunk_size, new_mode_manager::string(mode.current), THREAD_GROUP_MULTIPLIER, seconds,
print_to_file(out_file, run, chunk_size, "HBM", THREAD_GROUP_MULTIPLIER, seconds,
#ifdef THREAD_TIMINGS #ifdef THREAD_TIMINGS
qw.trt->summarize_time(0), qw.trt->summarize_time(1), qw.trt->summarize_time(2), qw.trt->summarize_time(0), qw.trt->summarize_time(1), qw.trt->summarize_time(2),
#endif #endif

21
qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h

@ -13,7 +13,6 @@
#include "timer_utils.h" #include "timer_utils.h"
#include "barrier_utils.h" #include "barrier_utils.h"
#include "measurement_utils.h" #include "measurement_utils.h"
#include "execution_modes.h"
#include "../../../../offloading-cacher/cache.hpp" #include "../../../../offloading-cacher/cache.hpp"
@ -45,7 +44,6 @@ private:
uint32_t thread_count_fc; uint32_t thread_count_fc;
uint32_t thread_count_fi; uint32_t thread_count_fi;
uint32_t thread_count_ag; uint32_t thread_count_ag;
uint32_t thread_count;
// done bits // done bits
volatile uint8_t* ready_flag_a; volatile uint8_t* ready_flag_a;
@ -58,7 +56,6 @@ private:
// params // params
base_t cmp_a; base_t cmp_a;
base_t cmp_b; base_t cmp_b;
NewPMode mode;
// sync // sync
std::unique_ptr<std::vector<std::barrier<barrier_completion_function>*>> sync_barrier; std::unique_ptr<std::vector<std::barrier<barrier_completion_function>*>> sync_barrier;
@ -106,9 +103,9 @@ private:
public: public:
Query_Wrapper(std::shared_future<void>* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a, Query_Wrapper(std::shared_future<void>* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a,
base_t* data_b, base_t* dest, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag, base_t* data_b, base_t* dest, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag,
NewPMode mode, base_t cmp_a = 50, base_t cmp_b = 42) :
base_t cmp_a = 50, base_t cmp_b = 42) :
ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b), ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b),
dest(dest), mode(mode), cmp_a(cmp_a), cmp_b(cmp_b) {
dest(dest), cmp_a(cmp_a), cmp_b(cmp_b) {
const int current_cpu = sched_getcpu(); const int current_cpu = sched_getcpu();
const int current_node = numa_node_of_cpu(current_cpu); const int current_node = numa_node_of_cpu(current_cpu);
@ -121,8 +118,6 @@ public:
thread_count_fc = tc_fc; thread_count_fc = tc_fc;
thread_count_ag = tc_ag; thread_count_ag = tc_ag;
thread_count = tc_fi + tc_fc + tc_ag;
ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), cache_node); ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), cache_node);
ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), cache_node); ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), cache_node);
@ -146,18 +141,12 @@ public:
sync_barrier.reset(); sync_barrier.reset();
} }
sync_barrier = std::make_unique<std::vector<std::barrier<barrier_completion_function>*>>(thread_count);
uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc; uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc;
uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_count;
uint32_t barrier_thread_count;
sync_barrier = std::make_unique<std::vector<std::barrier<barrier_completion_function>*>>(thread_count_sum);
uint32_t barrier_count = barrier_mode == "global" ? 1 : thread_count_sum;
if constexpr(simple){
barrier_thread_count = (thread_count / barrier_count) * (mode == NewPMode::Prefetch ? thread_count_sum : (thread_count_ag + thread_count_fi));
} else {
barrier_thread_count = (thread_count / barrier_count) * thread_count_sum;
}
for(uint32_t i = 0; i < barrier_count; ++i) { for(uint32_t i = 0; i < barrier_count; ++i) {
(*sync_barrier)[i] = new std::barrier<barrier_completion_function>(barrier_thread_count);
(*sync_barrier)[i] = new std::barrier<barrier_completion_function>(thread_count_sum);
} }
} }

104
qdp_project/src/utils/execution_modes.h

@ -1,104 +0,0 @@
#include <string>
enum PMode{no_copy = 0, hbm = 1, expl_copy = 2};
struct mode_manager {
static inline PMode inc(PMode value) {
return static_cast<PMode>(value + 1);
};
static inline bool pred(PMode value) {
return no_copy <= value && value <= expl_copy;
};
static std::string string(PMode value) {
switch(value) {
case no_copy: return "no_copy";
case hbm: return "hbm_pre";
case expl_copy:return "expl_co";
} return "no_copy";
};
};
#define SIMPLE_Q 0
#define COMPLEX_Q 1
#define SCAN_A 0
#define SCAN_B 1
#define AGGR_J 2
enum NewPMode{DRAM_base = 0, HBM_base = 1, Mixed_base = 2, Prefetch = 3};
struct new_mode_manager {
/*constexpr static int thread_counts[2][4][3] = {
//simple query
//scan_a, scan_b, aggr_j
{{3, 0, 3}, // DRAM_base
{3, 0, 3}, // HBM_base
{3, 0, 3}, // Mixed_base
{1, 4, 1}},// Prefetching
//complex query
{{1, 4, 1}, // DRAM_base
{1, 4, 1}, // HBM_base
{1, 4, 1}, // Mixed_base
{1, 4, 1}},// Prefetching
};*/
/*constexpr static int thread_counts[2][4][3] = {
//simple query
//scan_a, scan_b, aggr_j
{{2, 0, 4}, // DRAM_base
{2, 0, 4}, // HBM_base
{2, 0, 4}, // Mixed_base
{1, 4, 1}},// Prefetching
//complex query
{{1, 4, 1}, // DRAM_base
{1, 4, 1}, // HBM_base
{1, 4, 1}, // Mixed_base
{1, 4, 1}},// Prefetching
};*/
constexpr static int thread_counts[2][4][3] = {
// thread counts for both simple and complex querry
// inner layout: { scan_a, scan_b, aggr_j }
//simple query
{
{4, 0, 2}, // DRAM_base
{4, 0, 2}, // HBM_base
{4, 0, 2}, // Mixed_base
{4, 4, 4} // Prefetching
},
//complex query
{
{1, 4, 1}, // DRAM_base
{1, 4, 1}, // HBM_base
{1, 4, 1}, // Mixed_base
{4, 4, 4} // Prefetching
}
};
static inline NewPMode inc(NewPMode value) {
return static_cast<NewPMode>(value + 1);
};
static inline bool pred(NewPMode value) {
return DRAM_base <= value && value <= Prefetch;
};
static int thread_count(uint8_t query_type, NewPMode mode, uint8_t thread_type){
if(query_type > 1) query_type = 1;
if(thread_type > 2) thread_type = 2;
return (thread_counts[query_type][mode][thread_type]);
};
static std::string string(NewPMode value) {
switch(value) {
case DRAM_base:
return "DRAM_Baseline";
case HBM_base:
return "HBM_Baseline";
case Mixed_base:
return "DRAM_HBM_Baseline";
case Prefetch:
return "Q-d_Prefetching";
default:
std::cerr << "[x] Unknown Processing Mode" << std::endl;
exit(-1);
}
};
};
Loading…
Cancel
Save