This contains my bachelors thesis and associated tex files, code snippets and maybe more. Topic: Data Movement in Heterogeneous Memories with Intel Data Streaming Accelerator
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

183 lines
6.2 KiB

  1. #pragma once
  2. #include <iostream>
  3. #include <vector>
  4. #include <chrono>
  5. #include <pthread.h>
  6. #include <semaphore.h>
  7. #include <numa.h>
  8. #include <dml/dml.hpp>
  9. #include "statuscode-tostring.hpp"
  10. struct ThreadArgs {
  11. // thread placement / engine selection
  12. uint8_t numa_node;
  13. uint8_t core;
  14. // region size and source+destination for move
  15. size_t size;
  16. uint8_t nnode_src;
  17. uint8_t nnode_dst;
  18. // repetition
  19. uint32_t rep_count;
  20. bool batch_submit;
  21. uint32_t batch_size;
  22. uint32_t barrier_after_n_operations;
  23. // thread output
  24. dml::status_code status;
  25. // average run duration in microseconds
  26. double combined_duration;
  27. double submit_duration;
  28. double complete_duration;
  29. // completed iterations
  30. uint32_t rep_completed;
  31. // set by execution
  32. sem_t* sig;
  33. };
  34. double avg(const std::vector<double>& v) {
  35. int n = 0;
  36. double mean = 0.0;
  37. for (const auto x : v) {
  38. const double delta = static_cast<double>(x) - mean;
  39. mean += delta / ++n;
  40. }
  41. return mean;
  42. }
  43. #define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
  44. #define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Core " << args->core << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO
  45. #define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << #msg << std::endl; args->status = status; return nullptr; }}
  46. template <typename path>
  47. void* thread_function(void* argp) {
  48. ThreadArgs* args = reinterpret_cast<ThreadArgs*>(argp);
  49. std::vector<double> submission_durations;
  50. std::vector<double> completion_durations;
  51. std::vector<double> combined_durations;
  52. // set numa node and core affinity of the current thread
  53. numa_run_on_node(args->numa_node);
  54. cpu_set_t cpuset;
  55. CPU_ZERO(&cpuset);
  56. CPU_SET(args->core, &cpuset);
  57. if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) {
  58. LOG_ERR << "Error setting affinity for thread" << std::endl;
  59. return nullptr;
  60. }
  61. // allocate memory for the move operation on the requested numa nodes
  62. void* src = numa_alloc_onnode(args->size, args->nnode_src);
  63. void* dst = numa_alloc_onnode(args->size, args->nnode_dst);
  64. dml::data_view srcv = dml::make_view(reinterpret_cast<uint8_t*>(src), args->size);
  65. dml::data_view dstv = dml::make_view(reinterpret_cast<uint8_t*>(dst), args->size);
  66. args->status = dml::status_code::ok;
  67. args->rep_completed = 0;
  68. // wait for specified signal so that all operations start at the same time
  69. sem_wait(args->sig);
  70. for (uint32_t i = 0; i < args->rep_count; i++) {
  71. if (args->batch_submit) {
  72. uint32_t opcount = args->batch_size;
  73. if (args->barrier_after_n_operations > 0) {
  74. opcount += opcount / args->barrier_after_n_operations;
  75. }
  76. const auto st = std::chrono::high_resolution_clock::now();
  77. auto sequence = dml::sequence(opcount, std::allocator<dml::byte_t>());
  78. for (uint32_t j = 0; j < args->batch_size; j++) {
  79. const auto status = sequence.add(dml::mem_copy, srcv, dstv);
  80. if (j % args->barrier_after_n_operations == 0) {
  81. sequence.add(dml::nop);
  82. }
  83. }
  84. auto handler = dml::submit<path>(dml::batch, sequence);
  85. const auto se = std::chrono::high_resolution_clock::now();
  86. auto result = handler.get();
  87. const auto et = std::chrono::high_resolution_clock::now();
  88. submission_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(se - st).count());
  89. completion_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(et - se).count());
  90. combined_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(et - st).count());
  91. }
  92. else {
  93. const auto st = std::chrono::high_resolution_clock::now();
  94. // we use the asynchronous submit-routine even though this is not required
  95. // here, however the project later on will only use async operation and
  96. // therefore this behaviour should be benchmarked
  97. auto handler = dml::submit<path>(dml::mem_copy, srcv, dstv);
  98. const auto se = std::chrono::high_resolution_clock::now();
  99. auto result = handler.get();
  100. const auto et = std::chrono::high_resolution_clock::now();
  101. const dml::status_code status = result.status;
  102. CHECK_STATUS(status, "Operation completed with an Error!");
  103. submission_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(se - st).count());
  104. completion_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(et - se).count());
  105. combined_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(et - st).count());
  106. }
  107. args->rep_completed++;
  108. }
  109. // free the allocated memory regions on the selected nodes
  110. numa_free(src, args->size);
  111. numa_free(dst, args->size);
  112. args->combined_duration = avg(combined_durations);
  113. args->complete_duration = avg(completion_durations);
  114. args->submit_duration = avg(submission_durations);
  115. args->sig = nullptr;
  116. return nullptr;
  117. }
  118. template <typename path>
  119. void execute_dml_memcpy(std::vector<ThreadArgs>& args) {
  120. sem_t sem;
  121. std::vector<pthread_t> threads;
  122. // initialize semaphore and numactl-library
  123. sem_init(&sem, 0, 0);
  124. numa_available();
  125. // for each submitted task we link the semaphore
  126. // and create the thread, passing the argument
  127. for (auto& arg : args) {
  128. arg.sig = &sem;
  129. threads.emplace_back();
  130. if (pthread_create(&threads.back(), nullptr, thread_function<path>, &arg) != 0) {
  131. std::cerr << "Error creating thread" << std::endl;
  132. exit(1);
  133. }
  134. }
  135. // post will make all waiting threads pass
  136. sem_post(&sem);
  137. for (pthread_t& t : threads) {
  138. pthread_join(t, nullptr);
  139. }
  140. sem_destroy(&sem);
  141. }