|
|
#pragma once
#include <iostream>
#include <vector>
#include <chrono>
#include <pthread.h>
#include <semaphore.h>
#include <numa.h>
#include <dml/dml.hpp>
struct ThreadArgs { // thread placement / engine selection
uint8_t numa_node; uint8_t core; // region size and source+destination for move
size_t size; uint8_t nnode_src; uint8_t nnode_dst; // thread output
dml::status_code status; std::chrono::microseconds duration; // set by execution
sem_t* sig; };
template <typename path> void* thread_function(void* argp) { ThreadArgs* args = reinterpret_cast<ThreadArgs*>(argp);
// set numa node and core affinity of the current thread
numa_run_on_node(args->numa_node); cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(args->core, &cpuset); if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) { std::cerr << "Error setting affinity for thread designated to core " << args->core << " on node " << args->numa_node << std::endl; return nullptr; } // allocate memory for the move operation on the requested numa nodes
void* src = numa_alloc_onnode(args->size, args->nnode_src); void* dst = numa_alloc_onnode(args->size, args->nnode_dst); dml::data_view srcv = dml::make_view(reinterpret_cast<uint8_t*>(src), args->size); dml::data_view dstv = dml::make_view(reinterpret_cast<uint8_t*>(dst), args->size);
// wait for specified signal so that all operations start at the same time
sem_wait(args->sig); const auto st = std::chrono::high_resolution_clock::now();
// we use the asynchronous submit-routine even though this is not required
// here, however the project later on will only use async operation
auto handler = dml::submit<path>(dml::mem_move, srcv, dstv); auto result = handler.get();
const auto et = std::chrono::high_resolution_clock::now();
// free the allocated memory regions on the selected nodes
numa_free(src, args->size); numa_free(dst, args->size);
args->duration = std::chrono::duration_cast<std::chrono::microseconds>(et - st); args->status = result.status; args->sig = nullptr;
return nullptr; }
template <typename path> void execute_mem_move(std::vector<ThreadArgs>& args) { sem_t sem; std::vector<pthread_t> threads;
// initialize semaphore and numactl-library
sem_init(&sem, 0, 0); numa_available();
// for each submitted task we link the semaphore
// and create the thread, passing the argument
for (auto& arg : args) { arg.sig = &sem; threads.emplace_back();
if (pthread_create(&threads.back(), nullptr, thread_function<path>, &arg) != 0) { std::cerr << "Error creating thread" << std::endl; exit(1); } }
// post will make all waiting threads pass
sem_post(&sem);
for (pthread_t& t : threads) { pthread_join(t, nullptr); }
sem_destroy(&sem); }
|