@ -5,12 +5,12 @@
# include <fstream>
# include <future>
# include <array>
# include <atomic>
# include "const.h"
# include "filter.h"
# include "aggregation.h"
# include "array_utils.h"
# include "memory_literals.h"
# include "../../offloading-cacher/cache.hpp"
@ -22,36 +22,32 @@ using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>;
dsacache : : Cache CACHE_ ;
std : : array < std : : atomic < int32_t > , GROUP_COUNT > PREFETCHED_CHUNKS_ ;
std : : vector < std : : barrier < NopStruct > * > BARRIERS_ ;
std : : shared_future < void > LAUNCH_ ;
uint64_t * DATA_A_ ;
uint64_t * DATA_B_ ;
uint16_t * MASK_A_ ;
uint16_t * MASK_B_ ;
uint64_t * DATA_DST_ ;
// if more b than j -> perform b normal, subsplit j
// if more j than b -> subsplit b like it is now
void scan_b ( size_t gid , size_t tid ) {
constexpr size_t VIRT_TID_INCREMENT = TC_SCANB / TC_AGGRJ ;
constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / ( TC_SCANB = = 0 ? 1 : TC_SCANB ) ;
template < size_t TC_CACHING >
void caching ( size_t gid , size_t tid ) {
constexpr size_t VIRT_TID_INCREMENT = TC_CACHING / TC_AGGRJ ;
constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / ( TC_CACHING = = 0 ? 1 : TC_CACHING ) ;
constexpr bool CACHE_SUBCHUNKING = SUBCHUNK_THREAD_RATIO > 1 ;
constexpr bool CACHE_OVERCHUNKING = VIRT_TID_INCREMENT > 1 ;
THREAD_TIMING_ [ SCANB_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] . clear ( ) ;
THREAD_TIMING_ [ SCANB_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] . resize ( 1 ) ;
LAUNCH_ . wait ( ) ;
THREAD_TIMING_ [ SCANB_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ 0 ] [ TIME_STAMP_BEGIN ] = std : : chrono : : steady_clock : : now ( ) ;
if constexpr ( PERFORM_CACHING ) {
if constexpr ( CACHE_SUBCHUNKING ) {
constexpr size_t SUBCHUNK_COUNT = SUBCHUNK_THREAD_RATIO > 0 ? SUBCHUNK_THREAD_RATIO : 1 ;
constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT ;
constexpr size_t SUBCHUNK_SIZE_ELEMENTS = CHUNK_SIZE_ELEMENTS / SUBCHUNK_COUNT ;
constexpr size_t LAST_SUBCHUNK_SIZE_B = SUBCHUNK_SIZE_B + ( CHUNK_SIZE_B % SUBCHUNK_COUNT ) ;
// TODO: last thread (whether simulated or not) in last group must use last subchunk size for its last subchunk in the last run as size
for ( size_t i = 0 ; i < RUN_COUNT ; i + + ) {
const size_t chunk_index = get_chunk_index ( gid , i ) ;
@ -59,46 +55,67 @@ void scan_b(size_t gid, size_t tid) {
for ( size_t j = 0 ; j < SUBCHUNK_COUNT ; j + + ) {
uint64_t * sub_chunk_ptr = & chunk_ptr [ j * SUBCHUNK_SIZE_ELEMENTS ] ;
CACHE_ . Access ( reinterpret_cast < uint8_t * > ( sub_chunk_ptr ) , SUBCHUNK_SIZE_B ) ;
PREFETCHED_CHUNKS_ [ gid ] + + ;
PREFETCHED_CHUNKS_ [ gid ] . notify_one ( ) ;
}
}
}
else if constexpr ( CACHE_OVERCHUNKING ) {
constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + ( CHUNK_SIZE_B % ( TC_AGGRJ * GROUP_COUNT ) ) ;
// TODO: last thread (whether simulated or not) in last group must use last chunk size for its last run as size
for ( size_t tid_virt = tid ; tid_virt < TC_AGGRJ ; tid_virt + = VIRT_TID_INCREMENT ) {
for ( size_t i = 0 ; i < RUN_COUNT ; i + + ) {
const size_t chunk_index = get_chunk_index ( gid , i ) ;
uint64_t * chunk_ptr = get_chunk < TC_AGGRJ > ( DATA_B_ , chunk_index , tid_virt ) ;
CACHE_ . Access ( reinterpret_cast < uint8_t * > ( chunk_ptr ) , CHUNK_SIZE_B ) ;
PREFETCHED_CHUNKS_ [ gid ] + + ;
PREFETCHED_CHUNKS_ [ gid ] . notify_one ( ) ;
}
}
}
else {
constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + ( CHUNK_SIZE_B % ( ( TC_SCANB > 0 ? TC_SCANB : 1 ) * GROUP_COUNT ) ) ;
// TODO: last thread in last group must use last chunk size for its last run as size
for ( size_t i = 0 ; i < RUN_COUNT ; i + + ) {
const size_t chunk_index = get_chunk_index ( gid , i ) ;
uint64_t * chunk_ptr = get_chunk < TC_SCANB > ( DATA_B_ , chunk_index , tid ) ;
CACHE_ . Access ( reinterpret_cast < uint8_t * > ( chunk_ptr ) , CHUNK_SIZE_B ) ;
PREFETCHED_CHUNKS_ [ gid ] + + ;
PREFETCHED_CHUNKS_ [ gid ] . notify_one ( ) ;
}
}
}
}
if constexpr ( COMPLEX_QUERY ) {
for ( size_t i = 0 ; i < RUN_COUNT ; i + + ) {
const size_t chunk_index = get_chunk_index ( gid , i ) ;
uint64_t * chunk_ptr = get_chunk < TC_SCANB > ( DATA_B_ , chunk_index , tid ) ;
uint16_t * mask_ptr = get_mask < TC_SCANB > ( MASK_B_ , chunk_index , tid ) ;
void scan_b ( size_t gid , size_t tid ) {
THREAD_TIMING_ [ SCANB_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] . clear ( ) ;
THREAD_TIMING_ [ SCANB_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] . resize ( 1 ) ;
filter : : apply_same ( mask_ptr , nullptr , chunk_ptr , CMP_B , CHUNK_SIZE_B / TC_SCANB ) ;
}
LAUNCH_ . wait ( ) ;
THREAD_TIMING_ [ SCANB_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ 0 ] [ TIME_STAMP_BEGIN ] = std : : chrono : : steady_clock : : now ( ) ;
if constexpr ( ! PERFORM_CACHING_IN_AGGREGATION ) {
caching < TC_SCANB > ( gid , tid ) ;
}
THREAD_TIMING_ [ SCANB_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ 0 ] [ TIME_STAMP_WAIT ] = std : : chrono : : steady_clock : : now ( ) ;
THREAD_TIMING_ [ SCANB_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ 0 ] [ TIME_STAMP_END ] = std : : chrono : : steady_clock : : now ( ) ;
BARRIERS_ [ gid ] - > arrive_and_drop ( ) ;
}
void scan_a ( size_t gid , size_t tid ) {
constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + ( CHUNK_SIZE_B % ( TC_SCANA * GROUP_COUNT ) ) ;
// TODO: last thread in last group must use last chunk size for its last run as size
THREAD_TIMING_ [ SCANA_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] . clear ( ) ;
THREAD_TIMING_ [ SCANA_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] . resize ( 1 ) ;
@ -112,6 +129,8 @@ void scan_a(size_t gid, size_t tid) {
uint16_t * mask_ptr = get_mask < TC_SCANA > ( MASK_A_ , chunk_index , tid ) ;
filter : : apply_same ( mask_ptr , nullptr , chunk_ptr , CMP_A , CHUNK_SIZE_B / TC_SCANA ) ;
BARRIERS_ [ gid ] - > arrive_and_wait ( ) ;
}
THREAD_TIMING_ [ SCANA_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ 0 ] [ TIME_STAMP_WAIT ] = std : : chrono : : steady_clock : : now ( ) ;
@ -121,32 +140,42 @@ void scan_a(size_t gid, size_t tid) {
}
void aggr_j ( size_t gid , size_t tid ) {
constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ ;
constexpr size_t LAST_CHUNK_SIZE_B = SUBCHUNK_SIZE_B + ( CHUNK_SIZE_B % ( TC_AGGRJ * GROUP_COUNT ) ) ;
// TODO: last thread in last group must use last chunk size for its last run as size
CACHE_HITS_ [ UniqueIndex ( gid , tid ) ] = 0 ;
THREAD_TIMING_ [ AGGRJ_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] . clear ( ) ;
THREAD_TIMING_ [ AGGRJ_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] . resize ( 1 ) ;
THREAD_TIMING_ [ AGGRJ_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] . resize ( RUN_COUNT ) ;
__m512i aggregator = aggregation : : OP : : zero ( ) ;
LAUNCH_ . wait ( ) ;
THREAD_TIMING_ [ AGGRJ_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ 0 ] [ TIME_STAMP_BEGIN ] = std : : chrono : : steady_clock : : now ( ) ;
for ( size_t i = 0 ; i < RUN_COUNT ; i + + ) {
THREAD_TIMING_ [ AGGRJ_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ i ] [ TIME_STAMP_BEGIN ] = std : : chrono : : steady_clock : : now ( ) ;
BARRIERS_ [ gid ] - > arrive_and_wait ( ) ;
THREAD_TIMING_ [ AGGRJ_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ 0 ] [ TIME_STAMP_WAIT ] = std : : chrono : : steady_clock : : now ( ) ;
while ( true ) {
const int32_t old = PREFETCHED_CHUNKS_ [ gid ] . fetch_sub ( 1 ) ;
if ( old > 0 ) break ;
PREFETCHED_CHUNKS_ [ gid ] + + ;
PREFETCHED_CHUNKS_ [ gid ] . wait ( 0 ) ;
}
THREAD_TIMING_ [ AGGRJ_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ i ] [ TIME_STAMP_WAIT ] = std : : chrono : : steady_clock : : now ( ) ;
for ( size_t i = 0 ; i < RUN_COUNT ; i + + ) {
const size_t chunk_index = get_chunk_index ( gid , i ) ;
uint64_t * chunk_ptr = get_chunk < TC_AGGRJ > ( DATA_B_ , chunk_index , tid ) ;
uint16_t * mask_ptr_a = get_mask < TC_AGGRJ > ( MASK_A_ , chunk_index , tid ) ;
uint16_t * mask_ptr_b = get_mask < TC_AGGRJ > ( MASK_B_ , chunk_index , tid ) ;
std : : unique_ptr < dsacache : : CacheData > data ;
uint64_t * data_ptr ;
if constexpr ( PERFORM_CACHING ) {
data = CACHE_ . Access ( reinterpret_cast < uint8_t * > ( chunk_ptr ) , CHUNK_SIZE_B / TC_AGGRJ , dsacache : : FLAG_ACCESS_WEAK ) ;
data = CACHE_ . Access ( reinterpret_cast < uint8_t * > ( chunk_ptr ) , SUB CHUNK_SIZE_B, dsacache : : FLAG_ACCESS_WEAK ) ;
data - > WaitOnCompletion ( ) ;
data_ptr = reinterpret_cast < uint64_t * > ( data - > GetDataLocation ( ) ) ;
@ -165,17 +194,11 @@ void aggr_j(size_t gid, size_t tid) {
}
uint64_t tmp = _mm512_reduce_add_epi64 ( aggregator ) ;
aggregator = aggregation : : apply_masked ( aggregator , data_ptr , mask_ptr_a , SUBCHUNK_SIZE_B ) ;
if constexpr ( COMPLEX_QUERY ) {
aggregator = aggregation : : apply_masked ( aggregator , data_ptr , mask_ptr_a , mask_ptr_b , CHUNK_SIZE_B / TC_AGGRJ ) ;
}
else {
aggregator = aggregation : : apply_masked ( aggregator , data_ptr , mask_ptr_a , CHUNK_SIZE_B / TC_AGGRJ ) ;
}
THREAD_TIMING_ [ AGGRJ_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ i ] [ TIME_STAMP_END ] = std : : chrono : : steady_clock : : now ( ) ;
}
THREAD_TIMING_ [ AGGRJ_TIMING_INDEX ] [ UniqueIndex ( gid , tid ) ] [ 0 ] [ TIME_STAMP_END ] = std : : chrono : : steady_clock : : now ( ) ;
BARRIERS_ [ gid ] - > arrive_and_drop ( ) ;
aggregation : : happly ( & DATA_DST_ [ UniqueIndex ( gid , tid ) ] , aggregator ) ;
@ -188,35 +211,21 @@ int main() {
const int current_cpu = sched_getcpu ( ) ;
const int current_node = numa_node_of_cpu ( current_cpu ) ;
const int cache_node = CachePlacementPolicy ( current_node , current_node , 0 ) ;
if ( current_node ! = MEM_NODE_DRAM ) {
std : : cerr < < " Application is not running on pre-programmed Node! " < < std : : endl ;
}
const std : : string ofname = " results/qdp-xeonmax- " + std : : string ( MODE_STRING ) + " -tca " + std : : to_string ( TC_SCANA ) + " -tcb " + std : : to_string ( TC_SCANB ) + " -tcj " + std : : to_string ( TC_AGGRJ ) + " -tmul " + std : : to_string ( GROUP_COUNT ) + " -wl " + std : : to_string ( WL_SIZE_B ) + " -cs " + std : : to_string ( CHUNK_SIZE_B ) + " .csv " ;
std : : ofstream fout ( ofname ) ;
fout < < " run;rt-ns;rt-s;result[0];scana-run;scana-wait;scanb-run;scanb-wait;aggrj-run;aggrj-wait;cache-hr; " < < std : : endl ;
// a is allways allocated in DRAM
DATA_A_ = ( uint64_t * ) numa_alloc_local ( WL_SIZE_B ) ;
// resulting masks for a and b and total result will allways reside in HBM
MASK_A_ = ( uint16_t * ) numa_alloc_onnode ( WL_SIZE_ELEMENTS , cache_node ) ;
MASK_B_ = ( uint16_t * ) numa_alloc_onnode ( WL_SIZE_ELEMENTS , cache_node ) ;
DATA_DST_ = ( uint64_t * ) numa_alloc_onnode ( TC_AGGRJ * GROUP_COUNT * sizeof ( uint64_t ) , cache_node ) ;
// location of b depends on configuration
if constexpr ( STORE_B_IN_HBM ) {
DATA_B_ = ( uint64_t * ) numa_alloc_onnode ( WL_SIZE_B , cache_node ) ;
}
else {
DATA_B_ = ( uint64_t * ) numa_alloc_local ( WL_SIZE_B ) ;
}
DATA_A_ = ( uint64_t * ) numa_alloc_onnode ( WL_SIZE_B , MEM_NODE_A ) ;
DATA_B_ = ( uint64_t * ) numa_alloc_onnode ( WL_SIZE_B , MEM_NODE_B ) ;
DATA_A_ = ( uint64_t * ) numa_alloc_local ( WL_SIZE_B ) ;
MASK_A_ = ( uint16_t * ) numa_alloc_local ( WL_SIZE_ELEMENTS ) ;
DATA_DST_ = ( uint64_t * ) numa_alloc_local ( TC_AGGRJ * GROUP_COUNT * sizeof ( uint64_t ) ) ;
MASK_A_ = ( uint16_t * ) numa_alloc_onnode ( WL_SIZE_ELEMENTS , MEM_NODE_HBM ) ;
DATA_DST_ = ( uint64_t * ) numa_alloc_onnode ( TC_AGGRJ * GROUP_COUNT * sizeof ( uint64_t ) , MEM_NODE_HBM ) ;
if constexpr ( PERFORM_CACHING ) {
// cache will be configured to wait weak by default
@ -241,7 +250,7 @@ int main() {
std : : vector < std : : thread > agg_pool ;
for ( uint32_t gid = 0 ; gid < GROUP_COUNT ; + + gid ) {
BARRIERS_ . emplace_back ( new std : : barrier < NopStruct > ( TC_COMBINED ) ) ;
BARRIERS_ . emplace_back ( new std : : barrier < NopStruct > ( TC_SCANA + TC_AGGRJ ) ) ;
for ( uint32_t tid = 0 ; tid < TC_SCANA ; + + tid ) {
filter_pool . emplace_back ( scan_a , gid , tid ) ;
@ -269,7 +278,7 @@ int main() {
const auto time_end = std : : chrono : : steady_clock : : now ( ) ;
const uint64_t result_expected = COMPLEX_QUERY ? sum_check_complex ( CMP_A , CMP_B , DATA_A_ , DATA_B_ , WL_SIZE_B ) : sum_check ( CMP_A , DATA_A_ , DATA_B_ , WL_SIZE_B ) ;
const uint64_t result_expected = sum_check ( CMP_A , DATA_A_ , DATA_B_ , WL_SIZE_B ) ;
std : : cout < < " Result Expected: " < < result_expected < < " , Result Actual: " < < result_actual < < std : : endl ;