@ -113,7 +113,7 @@ namespace dsacache {
// dml handler vector pointer which is used
// to wait on caching task completion
std : : atomic < dml_handler * > * handler_ ;
std : : atomic < std : : vector < dml_handler > * > * handlers _ ;
// deallocates the global cache-location
// and invalidates it
@ -122,8 +122,8 @@ namespace dsacache {
size_t GetSize ( ) const { return size_ ; }
uint8_t * GetSource ( ) const { return src_ ; }
int32_t GetRefCount ( ) const { return active_ - > load ( ) ; }
void SetTaskHandlerAndCache ( uint8_t * cache , dml_handler * handler ) ;
void SetCacheToSource ( ) { cache_ - > store ( src_ ) ; delete_ = false ; }
void SetTaskHandlersAndCache ( uint8_t * cache , std : : vector < dml_handler > * handlers ) ;
// initializes the class after which it is thread safe
// but may only be destroyed safely after setting handlers
@ -445,34 +445,40 @@ inline uint8_t* dsacache::Cache::AllocOnNode(const size_t size, const int node)
}
inline void dsacache : : Cache : : SubmitTask ( CacheData * task , const int dst_node , const int src_node ) {
static thread_local int last_node_index = - 1 ;
// stores the last node used for the local thread so we can achieve some
// load balancing which locally might look like round robin, but considering
// that one source thread may see different results for "executing_nodes" with
// different sizes, and that multiple threads will submit, in reality we
// achieve a "wild-west-style" load balance here
uint8_t * dst = AllocOnNode ( task - > GetSize ( ) , dst_node ) ;
if ( dst = = nullptr ) {
return ;
}
// querry copy policy function for the nodes available to use for the copy
// querry copy policy function for the nodes to use for the copy
const std : : vector < int > executing_nodes = copy_policy_function_ ( dst_node , src_node , task - > GetSize ( ) ) ;
const size_t task_count = executing_nodes . size ( ) ;
// each task will copy one fair part of the total size
// and in case the total size is not a factor of the
// given task count the last node must copy the remainder
// use our load balancing method and determine node for this task
const size_t size = task - > GetSize ( ) / task_count ;
const size_t last_size = size + task - > GetSize ( ) % task_count ;
last_node_index = + + last_node_index % executing_nodes . size ( ) ;
const int node = executing_nodes [ last_node_index ] ;
// save the current numa node mask to restore later
// as executing the copy task will place this thread
// on a different node
// submit the copy and attach it to the task entry
auto handlers = new std : : vector < CacheData : : dml_handler > ( ) ;
for ( uint32_t i = 0 ; i < task_count ; i + + ) {
const size_t local_size = i + 1 = = task_count ? size : last_size ;
const size_t local_offset = i * size ;
const uint8_t * local_src = task - > GetSource ( ) + local_offset ;
uint8_t * local_dst = dst + local_offset ;
handlers - > emplace_back ( ExecuteCopy ( local_src , local_dst , local_size , executing_nodes [ i ] ) ) ;
}
auto * handler = new CacheData : : dml_handler ( ) ;
* handler = ExecuteCopy ( task - > GetSource ( ) , dst , task - > GetSize ( ) , node ) ;
task - > SetTaskHandlerAndCache ( dst , handler ) ;
task - > SetTaskHandlersAndCache ( dst , handlers ) ;
}
inline dml : : handler < dml : : mem_copy_operation , std : : allocator < uint8_t > > dsacache : : Cache : : ExecuteCopy (
@ -634,7 +640,7 @@ inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) {
delete_ = false ;
active_ = new std : : atomic < int32_t > ( 1 ) ;
cache_ = new std : : atomic < uint8_t * > ( data ) ;
handler_ = new std : : atomic < dml_handler * > ( nullptr ) ;
handlers _ = new std : : atomic < std : : vector < dml_handler > * > ( ) ;
incomplete_cache_ = new uint8_t * ( nullptr ) ;
}
@ -651,7 +657,7 @@ inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) {
flags_ = other . flags_ ;
incomplete_cache_ = other . incomplete_cache_ ;
handler_ = other . handler_ ;
handlers _ = other . handlers _ ;
}
inline dsacache : : CacheData : : ~ CacheData ( ) {
@ -676,7 +682,7 @@ inline dsacache::CacheData::~CacheData() {
delete active_ ;
delete cache_ ;
delete handler_ ;
delete handlers _ ;
delete incomplete_cache_ ;
}
}
@ -704,7 +710,7 @@ inline void dsacache::CacheData::WaitOnCompletion() {
// then check if the handlers are available
handler_ - > wait ( nullptr ) ;
handlers _ - > wait ( nullptr ) ;
// exchange the global handlers pointer with nullptr to have a local
// copy - this signals that this thread is the sole owner and therefore
@ -712,37 +718,44 @@ inline void dsacache::CacheData::WaitOnCompletion() {
// set to maximum of 64-bit in order to prevent deadlocks from the above
// waiting construct
dml_handler * local_handler = handler_ - > exchange ( reinterpret_cast < dml_handler * > ( maxptr ) ) ;
std : : vector < dml_handler > * local_handlers = handlers _ - > exchange ( reinterpret_cast < std : : vector < dml_handler > * > ( maxptr ) ) ;
// ensure that no other thread snatched the handlers before us
// and in case one did, wait again and then return
if ( local_handler = = nullptr | | local_handler = = reinterpret_cast < dml_handler * > ( maxptr ) ) {
if ( local_handlers = = nullptr | | local_handlers = = reinterpret_cast < std : : vector < dml_handler > * > ( maxptr ) ) {
cache_ - > wait ( nullptr ) ;
return ;
}
// at this point we are responsible for waiting for the handlers
// and handling any error that comes through them gracefully
bool error = false ;
if ( CheckFlag ( flags_ , FLAG_WAIT_WEAK ) & & ! local_handler - > is_finished ( ) ) {
handler_ - > store ( local_handler ) ;
return ;
}
for ( auto & handler : * local_handlers ) {
if ( CheckFlag ( flags_ , FLAG_WAIT_WEAK ) & & ! handler . is_finished ( ) ) {
handlers_ - > store ( local_handlers ) ;
return ;
}
// perform the wait
auto result = handler . get ( ) ;
auto result = local_handler - > get ( ) ;
if ( result . status ! = dml : : status_code : : ok ) {
// if one of the copy tasks failed we abort the whole task
// after all operations are completed on it
error = true ;
}
}
// at this point handlers has been waited for
// at this point all handlers have been waited for
// and therefore may be decomissioned
delete local_handler ;
delete local_handlers ;
// if the copy tasks failed we abort the whole task
// otherwise the cache will be set to valid now
// handle errors now by aborting the cache
if ( result . status ! = dml : : status_code : : ok ) {
if ( error ) {
cache_ - > store ( src_ ) ;
numa_free ( * incomplete_cache_ , size_ ) ;
delete_ = false ;
@ -755,13 +768,13 @@ inline void dsacache::CacheData::WaitOnCompletion() {
// notify all waiting threads so they wake up quickly
cache_ - > notify_all ( ) ;
handler_ - > notify_all ( ) ;
handlers _ - > notify_all ( ) ;
}
void dsacache : : CacheData : : SetTaskHandlerAndCache ( uint8_t * cache , dml_handler * handler ) {
void dsacache : : CacheData : : SetTaskHandlers AndCache ( uint8_t * cache , std : : vector < dml_handler > * handlers ) {
* incomplete_cache_ = cache ;
handler_ - > store ( handler ) ;
handler_ - > notify_one ( ) ;
handlers _ - > store ( handlers ) ;
handlers _ - > notify_one ( ) ;
}
void dsacache : : CacheData : : Init ( ) {