This contains my bachelors thesis and associated tex files, code snippets and maybe more. Topic: Data Movement in Heterogeneous Memories with Intel Data Streaming Accelerator
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123 lines
4.5 KiB

  1. #pragma once
  2. #include <iostream>
  3. #include <vector>
  4. #include <chrono>
  5. #include <numeric>
  6. #include <future>
  7. #include <pthread.h>
  8. #include <semaphore.h>
  9. #include <numa.h>
  10. #include <dml/dml.hpp>
  11. #include "util/dml-helper.hpp"
  12. #include "util/task-data.hpp"
  13. #define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
  14. #define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO
  15. #define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << msg << std::endl; args->status = status; return nullptr; }}
  16. std::shared_future<void> LAUNCH_;
  17. template <typename path>
  18. void* thread_function(void* argp) {
  19. TaskData* args = reinterpret_cast<TaskData*>(argp);
  20. // set numa node and core affinity of the current thread
  21. numa_run_on_node(args->numa_node);
  22. // allocate memory for the move operation on the requested numa nodes
  23. void* src = numa_alloc_onnode(args->size, args->nnode_src);
  24. void* dst = numa_alloc_onnode(args->size, args->nnode_dst);
  25. dml::data_view srcv = dml::make_view(reinterpret_cast<uint8_t*>(src), args->size);
  26. dml::data_view dstv = dml::make_view(reinterpret_cast<uint8_t*>(dst), args->size);
  27. std::memset(src, 0, args->size);
  28. std::memset(dst, 0, args->size);
  29. args->status = dml::status_code::ok;
  30. args->rep_completed = 0;
  31. LAUNCH_.wait();
  32. if (args->batch_size > 1) {
  33. auto sequence = dml::sequence(args->batch_size, std::allocator<dml::byte_t>());
  34. for (uint32_t j = 0; j < args->batch_size; j++) {
  35. // block_on_fault() is required to submit the task in a way so that the
  36. // DSA engine can handle page faults itself together with the IOMMU which
  37. // requires the WQ to be configured to allow this too
  38. const auto status = sequence.add(dml::mem_copy.block_on_fault(), srcv, dstv);
  39. CHECK_STATUS(status, "Adding operation to batch failed!");
  40. }
  41. // we use the asynchronous submit-routine even though this is not required
  42. // here, however the project later on will only use async operation and
  43. // therefore this behaviour should be benchmarked
  44. auto handler = dml::submit<path>(dml::batch, sequence);
  45. auto result = handler.get();
  46. const dml::status_code status = result.status;
  47. CHECK_STATUS(status, "Batch completed with an Error!");
  48. }
  49. else {
  50. // we use the asynchronous submit-routine even though this is not required
  51. // here, however the project later on will only use async operation and
  52. // therefore this behaviour should be benchmarked
  53. // block_on_fault() is required to submit the task in a way so that the
  54. // DSA engine can handle page faults itself together with the IOMMU which
  55. // requires the WQ to be configured to allow this too
  56. auto handler = dml::submit<path>(dml::mem_copy.block_on_fault(), srcv, dstv);
  57. auto result = handler.get();
  58. const dml::status_code status = result.status;
  59. CHECK_STATUS(status, "Operation completed with an Error!");
  60. }
  61. // free the allocated memory regions on the selected nodes
  62. numa_free(src, args->size);
  63. numa_free(dst, args->size);
  64. return nullptr;
  65. }
  66. template <typename path>
  67. std::vector<uint64_t> execute_dml_memcpy(std::vector<TaskData>& args, const uint64_t iterations) {
  68. std::vector<uint64_t> timing;
  69. // initialize numa library
  70. numa_available();
  71. // for each submitted task we link the semaphore
  72. // and create the thread, passing the argument
  73. for (uint64_t i = 0; i < iterations + 5; i++) {
  74. std::vector<std::thread> threads;
  75. std::promise<void> launch_promise;
  76. LAUNCH_ = launch_promise.get_future();
  77. for (auto& arg : args) {
  78. threads.emplace_back(thread_function<path>, &arg);
  79. }
  80. using namespace std::chrono_literals;
  81. std::this_thread::sleep_for(1000ms);
  82. const auto time_start = std::chrono::steady_clock::now();
  83. launch_promise.set_value();
  84. for(std::thread& t : threads) { t.join(); }
  85. const auto time_end = std::chrono::steady_clock::now();
  86. if (i >= 5) timing.emplace_back(std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count());
  87. }
  88. return timing;
  89. }