|
|
#pragma once
#include <iostream>
#include <vector>
#include <chrono>
#include <future>
#include <vector>
#include <numa.h>
#include <dml/dml.hpp>
#include "util/dml-helper.hpp"
#include "util/task-data.hpp"
#define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
#define LOG_ERR { std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << task->numa_node << " | Thread " << tid << "]" << std::endl; } std::cerr << LOG_CODE_INFO
#define CHECK_STATUS(stat,msg) { if (stat != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(stat) << std::endl << msg << std::endl; task->status = stat; return; }}
std::shared_future<void> LAUNCH_;
std::vector<uint64_t> ITERATION_TIMING_; std::vector<void*> SOURCE_; std::vector<void*> DESTINATION_;
template <typename path> void thread_function(const uint32_t tid, TaskData* task) { dml::data_view srcv = dml::make_view(reinterpret_cast<uint8_t*>(SOURCE_[tid]), task->size); dml::data_view dstv = dml::make_view(reinterpret_cast<uint8_t*>(DESTINATION_[tid]), task->size);
task->status = dml::status_code::ok;
LAUNCH_.wait();
if (task->batch_size > 1) { auto sequence = dml::sequence(task->batch_size, std::allocator<dml::byte_t>());
for (uint32_t j = 0; j < task->batch_size; j++) { const auto status = sequence.add(dml::mem_copy, srcv, dstv); CHECK_STATUS(status, "Adding operation to batch failed!"); }
// we use the asynchronous submit-routine even though this is not required
// here, however the project later on will only use async operation and
// therefore this behaviour should be benchmarked
auto handler = dml::submit<path>(dml::batch, sequence, dml::execution_interface<path, std::allocator<dml::byte_t>>(), task->numa_node);
auto result = handler.get();
const dml::status_code status = result.status; CHECK_STATUS(status, "Batch completed with an Error!"); } else { // we use the asynchronous submit-routine even though this is not required
// here, however the project later on will only use async operation and
// therefore this behaviour should be benchmarked
auto handler = dml::submit<path>(dml::mem_copy, srcv, dstv, dml::execution_interface<path, std::allocator<dml::byte_t>>(), task->numa_node);
auto result = handler.get();
const dml::status_code status = result.status; CHECK_STATUS(status, "Operation completed with an Error!"); } }
template <typename path> void execute_dml_memcpy(std::vector<TaskData>& args, const uint64_t iterations) { // initialize numa library
numa_available();
// initialize data fields for use
SOURCE_.resize(args.size()); DESTINATION_.resize(args.size());
for (uint32_t tid = 0; tid < args.size(); tid++) { SOURCE_[tid] = numa_alloc_onnode(args[tid].size, args[tid].nnode_src); DESTINATION_[tid] = numa_alloc_onnode(args[tid].size, args[tid].nnode_dst); std::memset(SOURCE_[tid], 0xAB, args[tid].size); std::memset(DESTINATION_[tid], 0xAB, args[tid].size); }
// for each requested iteration this is repeated, plus 5 iterations as warmup
for (uint64_t i = 0; i < iterations + 5; i++) { std::vector<std::thread> threads; std::promise<void> launch_promise; LAUNCH_ = launch_promise.get_future();
for (uint32_t tid = 0; tid < args.size(); tid++) { // we flush the cache for the memory regions to avoid any caching effects
dml::data_view srcv = dml::make_view(reinterpret_cast<uint8_t*>(SOURCE_[tid]), args[tid].size); dml::data_view dstv = dml::make_view(reinterpret_cast<uint8_t*>(DESTINATION_[tid]), args[tid].size); auto rsrc = dml::execute<path>(dml::cache_flush, srcv); auto rdst = dml::execute<path>(dml::cache_flush, dstv); TaskData* task = &args[tid]; CHECK_STATUS(rsrc.status, "Flushing Cache for Source failed!"); CHECK_STATUS(rdst.status, "Flushing Cache for Destination failed!");
// then spawn the thread
threads.emplace_back(thread_function<path>, tid, &args[tid]); }
using namespace std::chrono_literals; std::this_thread::sleep_for(1ms);
const auto time_start = std::chrono::steady_clock::now();
launch_promise.set_value();
for(std::thread& t : threads) { t.join(); }
const auto time_end = std::chrono::steady_clock::now();
if (i >= 5) ITERATION_TIMING_.emplace_back(std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count()); } }
|