This contains my bachelors thesis and associated tex files, code snippets and maybe more. Topic: Data Movement in Heterogeneous Memories with Intel Data Streaming Accelerator
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

144 lines
5.3 KiB

#pragma once
#include <iostream>
#include <vector>
#include <chrono>
#include <numeric>
#include <pthread.h>
#include <semaphore.h>
#include <numa.h>
#include <dml/dml.hpp>
#include "barrier.hpp"
#include "statuscode-tostring.hpp"
#include "task-data.hpp"
double avg(const std::vector<uint64_t>& v) {
return static_cast<long double>(std::accumulate(v.begin(), v.end(), 0)) / static_cast<long double>(v.size());
}
#define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
#define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO
#define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << msg << std::endl; args->status = status; return nullptr; }}
template <typename path>
void* thread_function(void* argp) {
TaskData* args = reinterpret_cast<TaskData*>(argp);
std::vector<uint64_t> submission_durations;
std::vector<uint64_t> completion_durations;
std::vector<uint64_t> combined_durations;
// set numa node and core affinity of the current thread
numa_run_on_node(args->numa_node);
// allocate memory for the move operation on the requested numa nodes
void* src = numa_alloc_onnode(args->size, args->nnode_src);
void* dst = numa_alloc_onnode(args->size, args->nnode_dst);
dml::data_view srcv = dml::make_view(reinterpret_cast<uint8_t*>(src), args->size);
dml::data_view dstv = dml::make_view(reinterpret_cast<uint8_t*>(dst), args->size);
args->status = dml::status_code::ok;
args->rep_completed = 0;
for (uint32_t i = 0; i < args->rep_count; i++) {
// synchronize the start of each iteration
// using the barrier structure
args->barrier_->wait();
if (args->batch_submit) {
uint32_t opcount = args->batch_size;
if (args->barrier_after_n_operations > 0) {
opcount += opcount / args->barrier_after_n_operations;
}
const auto st = std::chrono::high_resolution_clock::now();
auto sequence = dml::sequence(opcount, std::allocator<dml::byte_t>());
for (uint32_t j = 0; j < args->batch_size; j++) {
const auto status = sequence.add(dml::mem_copy, srcv, dstv);
if (j % args->barrier_after_n_operations == 0) {
sequence.add(dml::nop);
}
}
auto handler = dml::submit<path>(dml::batch, sequence);
const auto se = std::chrono::high_resolution_clock::now();
auto result = handler.get();
const auto et = std::chrono::high_resolution_clock::now();
const dml::status_code status = result.status;
CHECK_STATUS(status, "Batch completed with an Error!");
submission_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(se - st).count());
completion_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(et - se).count());
combined_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(et - st).count());
}
else {
const auto st = std::chrono::high_resolution_clock::now();
// we use the asynchronous submit-routine even though this is not required
// here, however the project later on will only use async operation and
// therefore this behaviour should be benchmarked
auto handler = dml::submit<path>(dml::mem_copy, srcv, dstv);
const auto se = std::chrono::high_resolution_clock::now();
auto result = handler.get();
const auto et = std::chrono::high_resolution_clock::now();
const dml::status_code status = result.status;
CHECK_STATUS(status, "Operation completed with an Error!");
submission_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(se - st).count());
completion_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(et - se).count());
combined_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(et - st).count());
}
args->rep_completed++;
}
// free the allocated memory regions on the selected nodes
numa_free(src, args->size);
numa_free(dst, args->size);
args->combined_duration = avg(combined_durations);
args->complete_duration = avg(completion_durations);
args->submit_duration = avg(submission_durations);
return nullptr;
}
template <typename path>
void execute_dml_memcpy(std::vector<TaskData>& args) {
barrier task_barrier(args.size());
std::vector<pthread_t> threads;
// initialize numa library
numa_available();
// for each submitted task we link the semaphore
// and create the thread, passing the argument
for (auto& arg : args) {
arg.barrier_ = &task_barrier;
threads.emplace_back();
if (pthread_create(&threads.back(), nullptr, thread_function<path>, &arg) != 0) {
std::cerr << "Error creating thread" << std::endl;
exit(1);
}
}
for (pthread_t& t : threads) {
pthread_join(t, nullptr);
}
}