@ -14,23 +14,22 @@
# define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
# define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
# define LOG_ERR { std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << task->numa_node << " | Thread " << tid << "]" << std::endl; } std::cerr << LOG_CODE_INFO
# define LOG_ERR { std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << task->numa_node << " | Thread " << tid << "]" << std::endl; } std::cerr << LOG_CODE_INFO
# define CHECK_STATUS(stat,msg) { if (stat != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(stat) << std::endl << msg << std::endl; task->status = stat; return; }}
# define CHECK_STATUS(stat,msg) { if (stat != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(stat) << std::endl << msg << std::endl; return; }}
std : : shared_future < void > LAUNCH_ ;
std : : shared_future < void > LAUNCH_ ;
std : : vector < uint64_t > ITERATION_TIMING_ ;
std : : vector < uint64_t > ITERATION_TIMING_ ;
std : : vector < void * > SOURCE_ ;
std : : vector < void * > DESTINATION_ ;
std : : vector < std : : vector < void * > > SOURCE_ ;
std : : vector < std : : vector < void * > > DESTINATION_ ;
template < typename path >
template < typename path >
void thread_function ( const uint32_t tid , TaskData * task ) {
void thread_function ( const uint32_t tid , TaskData * task ) {
dml : : data_view srcv = dml : : make_view ( reinterpret_cast < uint8_t * > ( SOURCE_ [ tid ] ) , task - > size ) ;
dml : : data_view dstv = dml : : make_view ( reinterpret_cast < uint8_t * > ( DESTINATION_ [ tid ] ) , task - > size ) ;
task - > status = dml : : status_code : : ok ;
LAUNCH_ . wait ( ) ;
LAUNCH_ . wait ( ) ;
for ( uint32_t i = 0 ; i < task - > rep_count ; i + + ) {
dml : : data_view srcv = dml : : make_view ( reinterpret_cast < uint8_t * > ( SOURCE_ [ tid ] [ i ] ) , task - > size ) ;
dml : : data_view dstv = dml : : make_view ( reinterpret_cast < uint8_t * > ( DESTINATION_ [ tid ] [ i ] ) , task - > size ) ;
if ( task - > batch_size > 1 ) {
if ( task - > batch_size > 1 ) {
auto sequence = dml : : sequence ( task - > batch_size , std : : allocator < dml : : byte_t > ( ) ) ;
auto sequence = dml : : sequence ( task - > batch_size , std : : allocator < dml : : byte_t > ( ) ) ;
@ -62,25 +61,72 @@ void thread_function(const uint32_t tid, TaskData* task) {
CHECK_STATUS ( status , " Operation completed with an Error! " ) ;
CHECK_STATUS ( status , " Operation completed with an Error! " ) ;
}
}
}
}
}
template < typename path >
template < typename path >
void execute_dml_memcpy ( std : : vector < TaskData > & args , const uint64_t iterations ) {
// initialize numa library
void flush_cache ( std : : vector < TaskData > & args ) {
auto flush_container = [ & args ] ( std : : vector < std : : vector < void * > > & container ) {
if ( container . size ( ) ! = args . size ( ) ) {
std : : cerr < < LOG_CODE_INFO < < " Failed Clearing Cache due to size missmatch between tasks and entries! " ;
exit ( - 1 ) ;
}
numa_available ( ) ;
for ( uint32_t i = 0 ; i < args . size ( ) ; i + + ) {
for ( auto ptr : container [ i ] ) {
dml : : data_view view = dml : : make_view ( reinterpret_cast < uint8_t * > ( ptr ) , args [ i ] . size ) ;
auto result = dml : : execute < path > ( dml : : cache_flush , view ) ;
// initialize data fields for use
if ( result . status ! = dml : : status_code : : ok ) {
std : : cerr < < LOG_CODE_INFO < < " Failed Clearing Cache! " ;
exit ( - 1 ) ;
}
}
}
} ;
flush_container ( DESTINATION_ ) ;
flush_container ( SOURCE_ ) ;
}
void alloc_data_fields ( std : : vector < TaskData > & args ) {
SOURCE_ . resize ( args . size ( ) ) ;
SOURCE_ . resize ( args . size ( ) ) ;
DESTINATION_ . resize ( args . size ( ) ) ;
DESTINATION_ . resize ( args . size ( ) ) ;
for ( uint32_t tid = 0 ; tid < args . size ( ) ; tid + + ) {
for ( uint32_t tid = 0 ; tid < args . size ( ) ; tid + + ) {
SOURCE_ [ tid ] = numa_alloc_onnode ( args [ tid ] . size , args [ tid ] . nnode_src ) ;
DESTINATION_ [ tid ] = numa_alloc_onnode ( args [ tid ] . size , args [ tid ] . nnode_dst ) ;
std : : memset ( SOURCE_ [ tid ] , 0xAB , args [ tid ] . size ) ;
std : : memset ( DESTINATION_ [ tid ] , 0xAB , args [ tid ] . size ) ;
DESTINATION_ [ tid ] . resize ( args [ tid ] . rep_count ) ;
SOURCE_ [ tid ] . resize ( args [ tid ] . rep_count ) ;
for ( uint32_t r = 0 ; r < args [ tid ] . rep_count ; r + + ) {
SOURCE_ [ tid ] [ r ] = numa_alloc_onnode ( args [ tid ] . size , args [ tid ] . nnode_src ) ;
DESTINATION_ [ tid ] [ r ] = numa_alloc_onnode ( args [ tid ] . size , args [ tid ] . nnode_dst ) ;
std : : memset ( SOURCE_ [ tid ] [ r ] , 0xAB , args [ tid ] . size ) ;
std : : memset ( DESTINATION_ [ tid ] [ r ] , 0xAB , args [ tid ] . size ) ;
}
}
}
void dealloc_data_fields ( std : : vector < TaskData > & args ) {
for ( uint32_t tid = 0 ; tid < args . size ( ) ; tid + + ) {
for ( uint32_t r = 0 ; r < args [ tid ] . rep_count ; r + + ) {
numa_free ( SOURCE_ [ tid ] [ r ] , args [ tid ] . size ) ;
numa_free ( DESTINATION_ [ tid ] [ r ] , args [ tid ] . size ) ;
}
}
}
SOURCE_ . clear ( ) ;
DESTINATION_ . clear ( ) ;
}
template < typename path >
void execute_dml_memcpy ( std : : vector < TaskData > & args , const uint64_t iterations ) {
// initialize numa library
numa_available ( ) ;
// initialize data fields for use
alloc_data_fields ( args ) ;
// for each requested iteration this is repeated, plus 5 iterations as warmup
// for each requested iteration this is repeated, plus 5 iterations as warmup
for ( uint64_t i = 0 ; i < iterations + 5 ; i + + ) {
for ( uint64_t i = 0 ; i < iterations + 5 ; i + + ) {
@ -88,22 +134,16 @@ void execute_dml_memcpy(std::vector<TaskData>& args, const uint64_t iterations)
std : : promise < void > launch_promise ;
std : : promise < void > launch_promise ;
LAUNCH_ = launch_promise . get_future ( ) ;
LAUNCH_ = launch_promise . get_future ( ) ;
for ( uint32_t tid = 0 ; tid < args . size ( ) ; tid + + ) {
// we flush the cache for the memory regions to avoid any caching effects
// we flush the cache for the memory regions to avoid any caching effects
flush_cache < path > ( args ) ;
dml : : data_view srcv = dml : : make_view ( reinterpret_cast < uint8_t * > ( SOURCE_ [ tid ] ) , args [ tid ] . size ) ;
dml : : data_view dstv = dml : : make_view ( reinterpret_cast < uint8_t * > ( DESTINATION_ [ tid ] ) , args [ tid ] . size ) ;
auto rsrc = dml : : execute < path > ( dml : : cache_flush , srcv ) ;
auto rdst = dml : : execute < path > ( dml : : cache_flush , dstv ) ;
TaskData * task = & args [ tid ] ;
CHECK_STATUS ( rsrc . status , " Flushing Cache for Source failed! " ) ;
CHECK_STATUS ( rdst . status , " Flushing Cache for Destination failed! " ) ;
// then spawn the thread
// for each requested task we spawn a thread and pass the task description
// and the thread id for accessing per-thread source and data pointers
for ( uint32_t tid = 0 ; tid < args . size ( ) ; tid + + ) {
threads . emplace_back ( thread_function < path > , tid , & args [ tid ] ) ;
threads . emplace_back ( thread_function < path > , tid , & args [ tid ] ) ;
}
}
// sleep shortly, hopefully after this all threads have reached the barrier
using namespace std : : chrono_literals ;
using namespace std : : chrono_literals ;
std : : this_thread : : sleep_for ( 1 ms ) ;
std : : this_thread : : sleep_for ( 1 ms ) ;
@ -117,4 +157,6 @@ void execute_dml_memcpy(std::vector<TaskData>& args, const uint64_t iterations)
if ( i > = 5 ) ITERATION_TIMING_ . emplace_back ( std : : chrono : : duration_cast < std : : chrono : : nanoseconds > ( time_end - time_start ) . count ( ) ) ;
if ( i > = 5 ) ITERATION_TIMING_ . emplace_back ( std : : chrono : : duration_cast < std : : chrono : : nanoseconds > ( time_end - time_start ) . count ( ) ) ;
}
}
dealloc_data_fields ( args ) ;
}
}