#pragma once #include #include #include #include #include #include #include #include "util/dml-helper.hpp" #include "util/task-data.hpp" #define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl #define LOG_ERR { std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << task->numa_node << " | Thread " << tid << "]" << std::endl; } std::cerr << LOG_CODE_INFO #define CHECK_STATUS(stat,msg) { if (stat != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(stat) << std::endl << msg << std::endl; return; }} std::shared_future LAUNCH_; std::vector ITERATION_TIMING_; std::vector> SOURCE_; std::vector> DESTINATION_; template void thread_function(const uint32_t tid, TaskData* task) { LAUNCH_.wait(); for (uint32_t i = 0; i < task->rep_count; i++) { dml::data_view srcv = dml::make_view(reinterpret_cast(SOURCE_[tid][i]), task->size); dml::data_view dstv = dml::make_view(reinterpret_cast(DESTINATION_[tid][i]), task->size); if (task->batch_size > 1) { auto sequence = dml::sequence(task->batch_size, std::allocator()); for (uint32_t j = 0; j < task->batch_size; j++) { const auto status = sequence.add(dml::mem_copy, srcv, dstv); CHECK_STATUS(status, "Adding operation to batch failed!"); } // we use the asynchronous submit-routine even though this is not required // here, however the project later on will only use async operation and // therefore this behaviour should be benchmarked auto handler = dml::submit(dml::batch, sequence, dml::execution_interface>(), task->numa_node); auto result = handler.get(); const dml::status_code status = result.status; CHECK_STATUS(status, "Batch completed with an Error!"); } else { // we use the asynchronous submit-routine even though this is not required // here, however the project later on will only use async operation and // therefore this behaviour should be benchmarked auto handler = dml::submit(dml::mem_copy, srcv, dstv, dml::execution_interface>(), task->numa_node); auto result = handler.get(); const dml::status_code status = result.status; CHECK_STATUS(status, "Operation completed with an Error!"); } } } template void flush_cache(std::vector& args) { auto flush_container = [&args](std::vector>& container) { if (container.size() != args.size()) { std::cerr << LOG_CODE_INFO << "Failed Clearing Cache due to size missmatch between tasks and entries!"; exit(-1); } for (uint32_t i = 0; i < args.size(); i++) { for (auto ptr : container[i]) { dml::data_view view = dml::make_view(reinterpret_cast(ptr), args[i].size); auto result = dml::execute(dml::cache_flush, view); if (result.status != dml::status_code::ok) { std::cerr << LOG_CODE_INFO << "Failed Clearing Cache!"; exit(-1); } } } }; flush_container(DESTINATION_); flush_container(SOURCE_); } void alloc_data_fields(std::vector& args) { SOURCE_.resize(args.size()); DESTINATION_.resize(args.size()); for (uint32_t tid = 0; tid < args.size(); tid++) { DESTINATION_[tid].resize(args[tid].rep_count); SOURCE_[tid].resize(args[tid].rep_count); for (uint32_t r = 0; r < args[tid].rep_count; r++) { SOURCE_[tid][r] = numa_alloc_onnode(args[tid].size, args[tid].nnode_src); DESTINATION_[tid][r] = numa_alloc_onnode(args[tid].size, args[tid].nnode_dst); std::memset(SOURCE_[tid][r], 0xAB, args[tid].size); std::memset(DESTINATION_[tid][r], 0xAB, args[tid].size); } } } void dealloc_data_fields(std::vector& args) { for (uint32_t tid = 0; tid < args.size(); tid++) { for (uint32_t r = 0; r < args[tid].rep_count; r++) { numa_free(SOURCE_[tid][r], args[tid].size); numa_free(DESTINATION_[tid][r], args[tid].size); } } SOURCE_.clear(); DESTINATION_.clear(); } template void execute_dml_memcpy(std::vector& args, const uint64_t iterations) { // initialize numa library numa_available(); // initialize data fields for use alloc_data_fields(args); // for each requested iteration this is repeated, plus 5 iterations as warmup for (uint64_t i = 0; i < iterations + 5; i++) { std::vector threads; std::promise launch_promise; LAUNCH_ = launch_promise.get_future(); // we flush the cache for the memory regions to avoid any caching effects flush_cache(args); // for each requested task we spawn a thread and pass the task description // and the thread id for accessing per-thread source and data pointers for (uint32_t tid = 0; tid < args.size(); tid++) { threads.emplace_back(thread_function, tid, &args[tid]); } // sleep shortly, hopefully after this all threads have reached the barrier using namespace std::chrono_literals; std::this_thread::sleep_for(1ms); const auto time_start = std::chrono::steady_clock::now(); launch_promise.set_value(); for(std::thread& t : threads) { t.join(); } const auto time_end = std::chrono::steady_clock::now(); if (i >= 5) ITERATION_TIMING_.emplace_back(std::chrono::duration_cast(time_end - time_start).count()); } dealloc_data_fields(args); }