#pragma once #include #include #include #include #include #include #include #include #include "util/barrier.hpp" #include "util/dml-helper.hpp" #include "util/task-data.hpp" #include "util/array_utils.h" #define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl #define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO #define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << msg << std::endl; args->status = status; return nullptr; }} #define ADD_TIMING_MESSUREMENT { if (i >= 5) { args->submit_duration.emplace_back(std::chrono::duration_cast(se - st).count()); args->complete_duration.emplace_back(std::chrono::duration_cast(et - se).count()); args->combined_duration.emplace_back(std::chrono::duration_cast(et - st).count());}} template void* thread_function(void* argp) { TaskData* args = reinterpret_cast(argp); // set numa node and core affinity of the current thread numa_run_on_node(args->numa_node); // allocate memory for the move operation on the requested numa nodes void* src = numa_alloc_onnode(args->size, args->nnode_src); void* dst = numa_alloc_onnode(args->size, args->nnode_dst); dml::data_view srcv = dml::make_view(reinterpret_cast(src), args->size); dml::data_view dstv = dml::make_view(reinterpret_cast(dst), args->size); fill_mt(reinterpret_cast(src), args->size, std::numeric_limits::min(), std::numeric_limits::max()); args->status = dml::status_code::ok; args->rep_completed = 0; // we add 5 as the first 5 iterations will not be meassured // to remove exceptional values encountered during warmup for (uint32_t i = 0; i < args->rep_count + 5; i++) { // synchronize the start of each iteration // using the barrier structure args->barrier_->wait(); if (args->batch_submit) { const auto st = std::chrono::steady_clock::now(); auto sequence = dml::sequence(args->batch_size, std::allocator()); for (uint32_t j = 0; j < args->batch_size; j++) { // block_on_fault() is required to submit the task in a way so that the // DSA engine can handle page faults itself together with the IOMMU which // requires the WQ to be configured to allow this too const auto status = sequence.add(dml::mem_copy.block_on_fault(), srcv, dstv); CHECK_STATUS(status, "Adding operation to batch failed!"); } // we use the asynchronous submit-routine even though this is not required // here, however the project later on will only use async operation and // therefore this behaviour should be benchmarked auto handler = dml::submit(dml::batch, sequence); const auto se = std::chrono::steady_clock::now(); auto result = handler.get(); const auto et = std::chrono::steady_clock::now(); const dml::status_code status = result.status; CHECK_STATUS(status, "Batch completed with an Error!"); ADD_TIMING_MESSUREMENT; } else if (args->batch_size > 1) { // implementation for non-batched batch submit follows here // this means we submit a bunch of work as single descriptors // but then dont wait for the completion immediately std::vector>> handlers; const auto st = std::chrono::steady_clock::now(); for (uint32_t j = 0; j < args->batch_size; j++) { // block_on_fault() is required to submit the task in a way so that the // DSA engine can handle page faults itself together with the IOMMU which // requires the WQ to be configured to allow this too handlers.emplace_back(dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv)); } const auto se = std::chrono::steady_clock::now(); for (auto& handler : handlers) { auto result = handler.get(); const dml::status_code status = result.status; CHECK_STATUS(status, "Operation completed with an Error!"); } const auto et = std::chrono::steady_clock::now(); ADD_TIMING_MESSUREMENT; } else { const auto st = std::chrono::steady_clock::now(); // we use the asynchronous submit-routine even though this is not required // here, however the project later on will only use async operation and // therefore this behaviour should be benchmarked // block_on_fault() is required to submit the task in a way so that the // DSA engine can handle page faults itself together with the IOMMU which // requires the WQ to be configured to allow this too auto handler = dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); const auto se = std::chrono::steady_clock::now(); auto result = handler.get(); const auto et = std::chrono::steady_clock::now(); const dml::status_code status = result.status; CHECK_STATUS(status, "Operation completed with an Error!"); ADD_TIMING_MESSUREMENT; } // again: we do not count the first 5 repetitions if (i >= 5) args->rep_completed++; } // free the allocated memory regions on the selected nodes numa_free(src, args->size); numa_free(dst, args->size); return nullptr; } template void execute_dml_memcpy(std::vector& args) { barrier task_barrier(args.size()); std::vector threads; // initialize numa library numa_available(); // for each submitted task we link the semaphore // and create the thread, passing the argument for (auto& arg : args) { arg.barrier_ = &task_barrier; threads.emplace_back(); if (pthread_create(&threads.back(), nullptr, thread_function, &arg) != 0) { std::cerr << "Error creating thread" << std::endl; exit(1); } } for (pthread_t& t : threads) { pthread_join(t, nullptr); } }