From 4e9688224bae1704b8667a06a9c18d69e1964965 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Mon, 27 Nov 2023 13:29:38 +0100 Subject: [PATCH] create a custom barrier structure that allows synchronization of each iteration of the meassurement loop --- benchmarks/barrier.hpp | 45 ++++++++++++++++++++++++++++++++ benchmarks/benchmark.hpp | 23 +++++++--------- benchmarks/task-data.hpp | 18 ++++++------- benchmarks/task-description.json | 6 ++--- 4 files changed, 65 insertions(+), 27 deletions(-) create mode 100644 benchmarks/barrier.hpp diff --git a/benchmarks/barrier.hpp b/benchmarks/barrier.hpp new file mode 100644 index 0000000..90a5835 --- /dev/null +++ b/benchmarks/barrier.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include +#include + +class barrier { +private: + std::mutex mutex_; + uint32_t waiting_count_; + const uint32_t barrier_size_; + sem_t semaphore_; + + barrier(const barrier& other) = delete; + +public: + barrier(const uint32_t size); + ~barrier(); + + void wait(); + void signal(); +}; + +inline barrier::~barrier() { + sem_destroy(&semaphore_); +} + +inline barrier::barrier(const uint32_t size) : barrier_size_(size) { + sem_init(&semaphore_, 0, 0); + waiting_count_ = 0; +} + +inline void barrier::wait() { + mutex_.lock(); + + if (++waiting_count_ >= barrier_size_) { + for (uint32_t i = 1; i < waiting_count_; i++) sem_post(&semaphore_); + mutex_.unlock(); + } + else { + mutex_.unlock(); + sem_wait(&semaphore_); + } +} + diff --git a/benchmarks/benchmark.hpp b/benchmarks/benchmark.hpp index a92ba79..a535c65 100644 --- a/benchmarks/benchmark.hpp +++ b/benchmarks/benchmark.hpp @@ -11,6 +11,7 @@ #include +#include "barrier.hpp" #include "statuscode-tostring.hpp" #include "task-data.hpp" @@ -20,7 +21,7 @@ double avg(const std::vector& v) { #define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl #define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO -#define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << ##msg << std::endl; args->status = status; return nullptr; }} +#define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << msg << std::endl; args->status = status; return nullptr; }} template void* thread_function(void* argp) { @@ -42,10 +43,11 @@ void* thread_function(void* argp) { args->status = dml::status_code::ok; args->rep_completed = 0; - // wait for specified signal so that all operations start at the same time - sem_wait(args->sig); - for (uint32_t i = 0; i < args->rep_count; i++) { + // synchronize the start of each iteration + // using the barrier structure + args->barrier_->wait(); + if (args->batch_submit) { uint32_t opcount = args->batch_size; @@ -112,24 +114,22 @@ void* thread_function(void* argp) { args->combined_duration = avg(combined_durations); args->complete_duration = avg(completion_durations); args->submit_duration = avg(submission_durations); - args->sig = nullptr; return nullptr; } template void execute_dml_memcpy(std::vector& args) { - sem_t sem; + barrier task_barrier(args.size()); std::vector threads; - // initialize semaphore and numactl-library - sem_init(&sem, 0, 0); + // initialize numa library numa_available(); // for each submitted task we link the semaphore // and create the thread, passing the argument for (auto& arg : args) { - arg.sig = &sem; + arg.barrier_ = &task_barrier; threads.emplace_back(); if (pthread_create(&threads.back(), nullptr, thread_function, &arg) != 0) { @@ -138,12 +138,7 @@ void execute_dml_memcpy(std::vector& args) { } } - // post will make all waiting threads pass - sem_post(&sem); - for (pthread_t& t : threads) { pthread_join(t, nullptr); } - - sem_destroy(&sem); } \ No newline at end of file diff --git a/benchmarks/task-data.hpp b/benchmarks/task-data.hpp index 273a094..17ca8a0 100644 --- a/benchmarks/task-data.hpp +++ b/benchmarks/task-data.hpp @@ -5,6 +5,7 @@ #include "json/single_include/nlohmann/json.hpp" #include "statuscode-tostring.hpp" +#include "barrier.hpp" struct TaskData { // thread placement / engine selection @@ -27,30 +28,29 @@ struct TaskData { // completed iterations uint32_t rep_completed; // set by execution - sem_t* sig; + barrier* barrier_; }; inline void to_json(nlohmann::json& j, const TaskData& a) { j["task"]["size"] = a.size; - j["task"]["iterations"]["desired"] = a.rep_count; - j["task"]["iterations"]["actual"] = a.rep_completed; + j["task"]["iterations"] = a.rep_count; j["task"]["batching"]["enabled"] = a.batch_submit; j["task"]["batching"]["batch_size"] = a.batch_size; j["task"]["batching"]["barrier_after_n_operations"] = a.barrier_after_n_operations; j["affinity"]["node"] = a.numa_node; j["affinity"]["nnode_src"] = a.nnode_src; j["affinity"]["nnode_dst"] = a.nnode_dst; - j["time"]["unit"] = "microseconds"; - j["time"]["summation"] = "average"; - j["time"]["completion"] = a.complete_duration; - j["time"]["submission"] = a.submit_duration; - j["time"]["combined"] = a.combined_duration; + j["report"]["time"]["unit"] = "microseconds"; + j["report"]["time"]["completion_avg"] = a.complete_duration; + j["report"]["time"]["submission_avg"] = a.submit_duration; + j["report"]["time"]["combined_avg"] = a.combined_duration; + j["report"]["iterations_completed"] = a.rep_completed; j["report"]["status"] = StatusCodeToString(a.status); } inline void from_json(const nlohmann::json& j, TaskData& a) { j["task"]["size"].get_to(a.size); - j["task"]["iterations"]["desired"].get_to(a.rep_count); + j["task"]["iterations"].get_to(a.rep_count); j["task"]["batching"]["enabled"].get_to(a.batch_submit); j["task"]["batching"]["batch_size"].get_to(a.batch_size); j["task"]["batching"]["barrier_after_n_operations"].get_to(a.barrier_after_n_operations); diff --git a/benchmarks/task-description.json b/benchmarks/task-description.json index 1172f8b..3d6c3cf 100644 --- a/benchmarks/task-description.json +++ b/benchmarks/task-description.json @@ -5,11 +5,9 @@ { "task": { "size": 4096, - "iterations": { - "desired": 10000 - }, + "iterations": 10000, "batching": { - "enabled": true, + "enabled": false, "batch_size": 100, "barrier_after_n_operations": 10 }