@ -19,10 +19,20 @@ double avg(const std::vector<uint64_t>& v) {
return static_cast < long double > ( std : : accumulate ( v . begin ( ) , v . end ( ) , 0 ) ) / static_cast < long double > ( v . size ( ) ) ;
return static_cast < long double > ( std : : accumulate ( v . begin ( ) , v . end ( ) , 0 ) ) / static_cast < long double > ( v . size ( ) ) ;
}
}
double stdev ( const std : : vector < uint64_t > & v , const double mean ) {
std : : vector < double > diff ( v . size ( ) ) ;
std : : transform ( v . begin ( ) , v . end ( ) , diff . begin ( ) , [ mean ] ( double x ) { return x - mean ; } ) ;
const double sq_sum = std : : inner_product ( diff . begin ( ) , diff . end ( ) , diff . begin ( ) , 0.0 ) ;
const double stdev = std : : sqrt ( sq_sum / static_cast < double > ( v . size ( ) ) ) ;
return stdev ;
}
# define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
# define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
# define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO
# define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO
# define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << msg << std::endl; args->status = status; return nullptr; }}
# define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << msg << std::endl; args->status = status; return nullptr; }}
# define ADD_TIMING_MESSUREMENT { if (i >= 5) { submission_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(se - st).count()); completion_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(et - se).count()); combined_durations.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(et - st).count());}}
template < typename path >
template < typename path >
void * thread_function ( void * argp ) {
void * thread_function ( void * argp ) {
TaskData * args = reinterpret_cast < TaskData * > ( argp ) ;
TaskData * args = reinterpret_cast < TaskData * > ( argp ) ;
@ -43,21 +53,17 @@ void* thread_function(void* argp) {
args - > status = dml : : status_code : : ok ;
args - > status = dml : : status_code : : ok ;
args - > rep_completed = 0 ;
args - > rep_completed = 0 ;
for ( uint32_t i = 0 ; i < args - > rep_count ; i + + ) {
// we add 5 as the first 5 iterations will not be meassured
// to remove exceptional values encountered during warmup
for ( uint32_t i = 0 ; i < args - > rep_count + 5 ; i + + ) {
// synchronize the start of each iteration
// synchronize the start of each iteration
// using the barrier structure
// using the barrier structure
args - > barrier_ - > wait ( ) ;
args - > barrier_ - > wait ( ) ;
if ( args - > batch_submit ) {
if ( args - > batch_submit ) {
uint32_t opcount = args - > batch_size ;
if ( args - > barrier_after_n_operations > 0 ) {
opcount + = opcount / args - > barrier_after_n_operations ;
}
const auto st = std : : chrono : : high_resolution_clock : : now ( ) ;
const auto st = std : : chrono : : steady_clock : : now ( ) ;
auto sequence = dml : : sequence ( opcount , std : : allocator < dml : : byte_t > ( ) ) ;
auto sequence = dml : : sequence ( args - > batch_size , std : : allocator < dml : : byte_t > ( ) ) ;
for ( uint32_t j = 0 ; j < args - > batch_size ; j + + ) {
for ( uint32_t j = 0 ; j < args - > batch_size ; j + + ) {
// block_on_fault() is required to submit the task in a way so that the
// block_on_fault() is required to submit the task in a way so that the
@ -66,10 +72,6 @@ void* thread_function(void* argp) {
const auto status = sequence . add ( dml : : mem_copy . block_on_fault ( ) , srcv , dstv ) ;
const auto status = sequence . add ( dml : : mem_copy . block_on_fault ( ) , srcv , dstv ) ;
CHECK_STATUS ( status , " Adding operation to batch failed! " ) ;
CHECK_STATUS ( status , " Adding operation to batch failed! " ) ;
if ( j % args - > barrier_after_n_operations = = 0 ) {
sequence . add ( dml : : nop ) ;
}
}
}
// we use the asynchronous submit-routine even though this is not required
// we use the asynchronous submit-routine even though this is not required
@ -78,21 +80,48 @@ void* thread_function(void* argp) {
auto handler = dml : : submit < path > ( dml : : batch , sequence ) ;
auto handler = dml : : submit < path > ( dml : : batch , sequence ) ;
const auto se = std : : chrono : : high_resolution _clock: : now ( ) ;
const auto se = std : : chrono : : steady _clock: : now ( ) ;
auto result = handler . get ( ) ;
auto result = handler . get ( ) ;
const auto et = std : : chrono : : high_resolution _clock: : now ( ) ;
const auto et = std : : chrono : : steady _clock: : now ( ) ;
const dml : : status_code status = result . status ;
const dml : : status_code status = result . status ;
CHECK_STATUS ( status , " Batch completed with an Error! " ) ;
CHECK_STATUS ( status , " Batch completed with an Error! " ) ;
submission_durations . emplace_back ( std : : chrono : : duration_cast < std : : chrono : : microseconds > ( se - st ) . count ( ) ) ;
completion_durations . emplace_back ( std : : chrono : : duration_cast < std : : chrono : : microseconds > ( et - se ) . count ( ) ) ;
combined_durations . emplace_back ( std : : chrono : : duration_cast < std : : chrono : : microseconds > ( et - st ) . count ( ) ) ;
ADD_TIMING_MESSUREMENT ;
}
else if ( args - > batch_size > 1 ) {
// implementation for non-batched batch submit follows here
// this means we submit a bunch of work as single descriptors
// but then dont wait for the completion immediately
std : : vector < dml : : handler < dml : : mem_copy_operation , std : : allocator < uint8_t > > > handlers ;
const auto st = std : : chrono : : steady_clock : : now ( ) ;
for ( uint32_t j = 0 ; j < args - > batch_size ; j + + ) {
// block_on_fault() is required to submit the task in a way so that the
// DSA engine can handle page faults itself together with the IOMMU which
// requires the WQ to be configured to allow this too
handlers . emplace_back ( dml : : submit < path > ( dml : : mem_copy . block_on_fault ( ) , srcv , dstv ) ) ;
}
const auto se = std : : chrono : : steady_clock : : now ( ) ;
for ( auto & handler : handlers ) {
auto result = handler . get ( ) ;
const dml : : status_code status = result . status ;
CHECK_STATUS ( status , " Operation completed with an Error! " ) ;
}
const auto et = std : : chrono : : steady_clock : : now ( ) ;
ADD_TIMING_MESSUREMENT ;
}
}
else {
else {
const auto st = std : : chrono : : high_resolution_clock : : now ( ) ;
const auto st = std : : chrono : : steady _clock: : now ( ) ;
// we use the asynchronous submit-routine even though this is not required
// we use the asynchronous submit-routine even though this is not required
// here, however the project later on will only use async operation and
// here, however the project later on will only use async operation and
@ -102,18 +131,16 @@ void* thread_function(void* argp) {
// requires the WQ to be configured to allow this too
// requires the WQ to be configured to allow this too
auto handler = dml : : submit < path > ( dml : : mem_copy . block_on_fault ( ) , srcv , dstv ) ;
auto handler = dml : : submit < path > ( dml : : mem_copy . block_on_fault ( ) , srcv , dstv ) ;
const auto se = std : : chrono : : high_resolution _clock: : now ( ) ;
const auto se = std : : chrono : : steady _clock: : now ( ) ;
auto result = handler . get ( ) ;
auto result = handler . get ( ) ;
const auto et = std : : chrono : : high_resolution _clock: : now ( ) ;
const auto et = std : : chrono : : steady _clock: : now ( ) ;
const dml : : status_code status = result . status ;
const dml : : status_code status = result . status ;
CHECK_STATUS ( status , " Operation completed with an Error! " ) ;
CHECK_STATUS ( status , " Operation completed with an Error! " ) ;
submission_durations . emplace_back ( std : : chrono : : duration_cast < std : : chrono : : microseconds > ( se - st ) . count ( ) ) ;
completion_durations . emplace_back ( std : : chrono : : duration_cast < std : : chrono : : microseconds > ( et - se ) . count ( ) ) ;
combined_durations . emplace_back ( std : : chrono : : duration_cast < std : : chrono : : microseconds > ( et - st ) . count ( ) ) ;
ADD_TIMING_MESSUREMENT ;
}
}
args - > rep_completed + + ;
args - > rep_completed + + ;
@ -126,6 +153,9 @@ void* thread_function(void* argp) {
args - > combined_duration = avg ( combined_durations ) ;
args - > combined_duration = avg ( combined_durations ) ;
args - > complete_duration = avg ( completion_durations ) ;
args - > complete_duration = avg ( completion_durations ) ;
args - > submit_duration = avg ( submission_durations ) ;
args - > submit_duration = avg ( submission_durations ) ;
args - > combined_duration_stdev = stdev ( combined_durations , args - > combined_duration ) ;
args - > complete_duration_stdev = stdev ( completion_durations , args - > complete_duration ) ;
args - > submit_duration_stdev = stdev ( submission_durations , args - > submit_duration ) ;
return nullptr ;
return nullptr ;
}
}