@ -3,53 +3,39 @@
# include <iostream>
# include <vector>
# include <chrono>
# include <numeric>
# include <future>
# include <vector>
# include <pthread.h>
# include <semaphore.h>
# include <numa.h>
# include <dml/dml.hpp>
# include "util/dml-helper.hpp"
# include "util/task-data.hpp"
# define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
# define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << arg s->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO
# define CHECK_STATUS(status ,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status ) << std::endl << msg << std::endl; arg s->status = status ; return nullptr ; }}
# define LOG_ERR { std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << t ask ->numa_node << " | Thread " << tid << "]" << std::endl; } std::cerr << LOG_CODE_INFO
# define CHECK_STATUS(stat,msg) { if (stat != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(stat) << std::endl << msg << std::endl; t ask ->status = stat; return; }}
std : : shared_future < void > LAUNCH_ ;
template < typename path >
void * thread_function ( void * argp ) {
TaskData * args = reinterpret_cast < TaskData * > ( argp ) ;
// set numa node and core affinity of the current thread
numa_run_on_node ( args - > numa_node ) ;
// allocate memory for the move operation on the requested numa nodes
void * src = numa_alloc_onnode ( args - > size , args - > nnode_src ) ;
void * dst = numa_alloc_onnode ( args - > size , args - > nnode_dst ) ;
dml : : data_view srcv = dml : : make_view ( reinterpret_cast < uint8_t * > ( src ) , args - > size ) ;
dml : : data_view dstv = dml : : make_view ( reinterpret_cast < uint8_t * > ( dst ) , args - > size ) ;
std : : vector < uint64_t > ITERATION_TIMING_ ;
std : : vector < void * > SOURCE_ ;
std : : vector < void * > DESTINATION_ ;
std : : memset ( src , 0 , args - > size ) ;
std : : memset ( dst , 0 , args - > size ) ;
template < typename path >
void thread_function ( const uint32_t tid , TaskData * task ) {
dml : : data_view srcv = dml : : make_view ( reinterpret_cast < uint8_t * > ( SOURCE_ [ tid ] ) , task - > size ) ;
dml : : data_view dstv = dml : : make_view ( reinterpret_cast < uint8_t * > ( DESTINATION_ [ tid ] ) , task - > size ) ;
args - > status = dml : : status_code : : ok ;
task - > status = dml : : status_code : : ok ;
LAUNCH_ . wait ( ) ;
if ( args - > batch_size > 1 ) {
auto sequence = dml : : sequence ( args - > batch_size , std : : allocator < dml : : byte_t > ( ) ) ;
for ( uint32_t j = 0 ; j < args - > batch_size ; j + + ) {
// block_on_fault() is required to submit the task in a way so that the
// DSA engine can handle page faults itself together with the IOMMU which
// requires the WQ to be configured to allow this too
if ( task - > batch_size > 1 ) {
auto sequence = dml : : sequence ( task - > batch_size , std : : allocator < dml : : byte_t > ( ) ) ;
const auto status = sequence . add ( dml : : mem_copy . block_on_fault ( ) , srcv , dstv ) ;
for ( uint32_t j = 0 ; j < task - > batch_size ; j + + ) {
const auto status = sequence . add ( dml : : mem_copy , srcv , dstv ) ;
CHECK_STATUS ( status , " Adding operation to batch failed! " ) ;
}
@ -57,7 +43,7 @@ void* thread_function(void* argp) {
// here, however the project later on will only use async operation and
// therefore this behaviour should be benchmarked
auto handler = dml : : submit < path > ( dml : : batch , sequence ) ;
auto handler = dml : : submit < path > ( dml : : batch , sequence , dml : : execution_interface < path , std : : allocator < dml : : byte_t > > ( ) , task - > numa_node ) ;
auto result = handler . get ( ) ;
@ -68,45 +54,55 @@ void* thread_function(void* argp) {
// we use the asynchronous submit-routine even though this is not required
// here, however the project later on will only use async operation and
// therefore this behaviour should be benchmarked
// block_on_fault() is required to submit the task in a way so that the
// DSA engine can handle page faults itself together with the IOMMU which
// requires the WQ to be configured to allow this too
auto handler = dml : : submit < path > ( dml : : mem_copy . block_on_fault ( ) , srcv , dstv ) ;
auto handler = dml : : submit < path > ( dml : : mem_copy , srcv , dstv , dml : : execution_interface < path , std : : allocator < dml : : byte_t > > ( ) , task - > numa_node ) ;
auto result = handler . get ( ) ;
const dml : : status_code status = result . status ;
CHECK_STATUS ( status , " Operation completed with an Error! " ) ;
}
// free the allocated memory regions on the selected nodes
numa_free ( src , args - > size ) ;
numa_free ( dst , args - > size ) ;
return nullptr ;
}
template < typename path >
std : : vector < uint64_t > execute_dml_memcpy ( std : : vector < TaskData > & args , const uint64_t iterations ) {
std : : vector < uint64_t > timing ;
void execute_dml_memcpy ( std : : vector < TaskData > & args , const uint64_t iterations ) {
// initialize numa library
numa_available ( ) ;
// for each submitted task we link the semaphore
// and create the thread, passing the argument
// initialize data fields for use
for ( uint32_t tid = 0 ; tid < args . size ( ) ; tid + + ) {
SOURCE_ [ tid ] = numa_alloc_onnode ( args [ tid ] . size , args [ tid ] . nnode_src ) ;
DESTINATION_ [ tid ] = numa_alloc_onnode ( args [ tid ] . size , args [ tid ] . nnode_dst ) ;
std : : memset ( SOURCE_ [ tid ] , 0xAB , args [ tid ] . size ) ;
std : : memset ( DESTINATION_ [ tid ] , 0xAB , args [ tid ] . size ) ;
}
// for each requested iteration this is repeated, plus 5 iterations as warmup
for ( uint64_t i = 0 ; i < iterations + 5 ; i + + ) {
std : : vector < std : : thread > threads ;
std : : promise < void > launch_promise ;
LAUNCH_ = launch_promise . get_future ( ) ;
for ( auto & arg : args ) {
threads . emplace_back ( thread_function < path > , & arg ) ;
for ( uint32_t tid = 0 ; tid < args . size ( ) ; tid + + ) {
// we flush the cache for the memory regions to avoid any caching effects
dml : : data_view srcv = dml : : make_view ( reinterpret_cast < uint8_t * > ( SOURCE_ [ tid ] ) , args [ tid ] . size ) ;
dml : : data_view dstv = dml : : make_view ( reinterpret_cast < uint8_t * > ( DESTINATION_ [ tid ] ) , args [ tid ] . size ) ;
auto rsrc = dml : : execute < dml : : software > ( dml : : cache_flush , srcv ) ;
auto rdst = dml : : execute < dml : : software > ( dml : : cache_flush , dstv ) ;
TaskData * task = & args [ tid ] ;
CHECK_STATUS ( rsrc . status , " Flushing Cache for Source failed! " ) ;
CHECK_STATUS ( rdst . status , " Flushing Cache for Destination failed! " ) ;
// then spawn the thread
threads . emplace_back ( thread_function < path > , tid , & args [ tid ] ) ;
}
using namespace std : : chrono_literals ;
std : : this_thread : : sleep_for ( 100 ms ) ;
std : : this_thread : : sleep_for ( 1 ms ) ;
const auto time_start = std : : chrono : : steady_clock : : now ( ) ;
@ -116,8 +112,6 @@ std::vector<uint64_t> execute_dml_memcpy(std::vector<TaskData>& args, const uint
const auto time_end = std : : chrono : : steady_clock : : now ( ) ;
if ( i > = 5 ) timing . emplace_back ( std : : chrono : : duration_cast < std : : chrono : : nanoseconds > ( time_end - time_start ) . count ( ) ) ;
if ( i > = 5 ) ITERATION_TIMING_ . emplace_back ( std : : chrono : : duration_cast < std : : chrono : : nanoseconds > ( time_end - time_start ) . count ( ) ) ;
}
return timing ;
}