This contains my bachelors thesis and associated tex files, code snippets and maybe more. Topic: Data Movement in Heterogeneous Memories with Intel Data Streaming Accelerator
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

161 lines
6.0 KiB

  1. #pragma once
  2. #include <iostream>
  3. #include <vector>
  4. #include <chrono>
  5. #include <future>
  6. #include <vector>
  7. #include <numa.h>
  8. #include <dml/dml.hpp>
  9. #include "util/dml-helper.hpp"
  10. #include "util/task-data.hpp"
  11. #define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
  12. #define LOG_ERR { std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << task->numa_node << " | Thread " << tid << "]" << std::endl; } std::cerr << LOG_CODE_INFO
  13. #define CHECK_STATUS(stat,msg) { if (stat != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(stat) << std::endl << msg << std::endl; return; }}
  14. std::shared_future<void> LAUNCH_;
  15. std::vector<uint64_t> ITERATION_TIMING_;
  16. std::vector<std::vector<void*>> SOURCE_;
  17. std::vector<std::vector<void*>> DESTINATION_;
  18. template <typename path>
  19. void thread_function(const uint32_t tid, TaskData* task) {
  20. LAUNCH_.wait();
  21. for (uint32_t i = 0; i < task->rep_count; i++) {
  22. dml::data_view srcv = dml::make_view(reinterpret_cast<uint8_t*>(SOURCE_[tid][i]), task->size);
  23. dml::data_view dstv = dml::make_view(reinterpret_cast<uint8_t*>(DESTINATION_[tid][i]), task->size);
  24. if (task->batch_size > 1) {
  25. auto sequence = dml::sequence(task->batch_size, std::allocator<dml::byte_t>());
  26. for (uint32_t j = 0; j < task->batch_size; j++) {
  27. const auto status = sequence.add(dml::mem_copy, srcv, dstv);
  28. CHECK_STATUS(status, "Adding operation to batch failed!");
  29. }
  30. // we use the asynchronous submit-routine even though this is not required
  31. // here, however the project later on will only use async operation and
  32. // therefore this behaviour should be benchmarked
  33. auto handler = dml::submit<path>(dml::batch, sequence, dml::execution_interface<path, std::allocator<dml::byte_t>>(), task->numa_node);
  34. auto result = handler.get();
  35. const dml::status_code status = result.status;
  36. CHECK_STATUS(status, "Batch completed with an Error!");
  37. }
  38. else {
  39. // we use the asynchronous submit-routine even though this is not required
  40. // here, however the project later on will only use async operation and
  41. // therefore this behaviour should be benchmarked
  42. auto handler = dml::submit<path>(dml::mem_copy, srcv, dstv, dml::execution_interface<path, std::allocator<dml::byte_t>>(), task->numa_node);
  43. auto result = handler.get();
  44. const dml::status_code status = result.status;
  45. CHECK_STATUS(status, "Operation completed with an Error!");
  46. }
  47. }
  48. }
  49. template <typename path>
  50. void flush_cache(std::vector<TaskData>& args) {
  51. auto flush_container = [&args](std::vector<std::vector<void*>>& container) {
  52. if (container.size() != args.size()) {
  53. std::cerr << LOG_CODE_INFO << "Failed Clearing Cache due to size missmatch between tasks and entries!";
  54. exit(-1);
  55. }
  56. for (uint32_t i = 0; i < args.size(); i++) {
  57. for (auto ptr : container[i]) {
  58. dml::data_view view = dml::make_view(reinterpret_cast<uint8_t*>(ptr), args[i].size);
  59. auto result = dml::execute<path>(dml::cache_flush, view);
  60. if (result.status != dml::status_code::ok) {
  61. std::cerr << LOG_CODE_INFO << "Failed Clearing Cache!";
  62. exit(-1);
  63. }
  64. }
  65. }
  66. };
  67. flush_container(DESTINATION_);
  68. flush_container(SOURCE_);
  69. }
  70. void alloc_data_fields(std::vector<TaskData>& args) {
  71. SOURCE_.resize(args.size());
  72. DESTINATION_.resize(args.size());
  73. for (uint32_t tid = 0; tid < args.size(); tid++) {
  74. DESTINATION_[tid].resize(args[tid].rep_count);
  75. SOURCE_[tid].resize(args[tid].rep_count);
  76. for (uint32_t r = 0; r < args[tid].rep_count; r++) {
  77. SOURCE_[tid][r] = numa_alloc_onnode(args[tid].size, args[tid].nnode_src);
  78. DESTINATION_[tid][r] = numa_alloc_onnode(args[tid].size, args[tid].nnode_dst);
  79. std::memset(SOURCE_[tid][r], 0xAB, args[tid].size);
  80. std::memset(DESTINATION_[tid][r], 0xAB, args[tid].size);
  81. }
  82. }
  83. }
  84. void dealloc_data_fields(std::vector<TaskData>& args) {
  85. for (uint32_t tid = 0; tid < args.size(); tid++) {
  86. for (uint32_t r = 0; r < args[tid].rep_count; r++) {
  87. numa_free(SOURCE_[tid][r], args[tid].size);
  88. numa_free(DESTINATION_[tid][r], args[tid].size);
  89. }
  90. }
  91. SOURCE_.clear();
  92. DESTINATION_.clear();
  93. }
  94. template <typename path>
  95. void execute_dml_memcpy(std::vector<TaskData>& args, const uint64_t iterations) {
  96. // initialize numa library
  97. numa_available();
  98. // initialize data fields for use
  99. alloc_data_fields(args);
  100. // for each requested iteration this is repeated, plus 5 iterations as warmup
  101. for (uint64_t i = 0; i < iterations + 5; i++) {
  102. std::vector<std::thread> threads;
  103. std::promise<void> launch_promise;
  104. LAUNCH_ = launch_promise.get_future();
  105. // we flush the cache for the memory regions to avoid any caching effects
  106. flush_cache<path>(args);
  107. // for each requested task we spawn a thread and pass the task description
  108. // and the thread id for accessing per-thread source and data pointers
  109. for (uint32_t tid = 0; tid < args.size(); tid++) {
  110. threads.emplace_back(thread_function<path>, tid, &args[tid]);
  111. }
  112. // sleep shortly, hopefully after this all threads have reached the barrier
  113. using namespace std::chrono_literals;
  114. std::this_thread::sleep_for(1ms);
  115. const auto time_start = std::chrono::steady_clock::now();
  116. launch_promise.set_value();
  117. for(std::thread& t : threads) { t.join(); }
  118. const auto time_end = std::chrono::steady_clock::now();
  119. if (i >= 5) ITERATION_TIMING_.emplace_back(std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count());
  120. }
  121. dealloc_data_fields(args);
  122. }