Browse Source

finish first stage of caching implementation and provide a rudimentary test function in the main

master
Constantin Fürst 12 months ago
parent
commit
5e30a370ce
  1. 174
      offloading-cacher/benchmark.hpp
  2. 80
      offloading-cacher/main.cpp
  3. 288
      offloading-cacher/offloading-cache.hpp

174
offloading-cacher/benchmark.hpp

@ -1,174 +0,0 @@
#pragma once
#include <iostream>
#include <vector>
#include <chrono>
#include <numeric>
#include <pthread.h>
#include <semaphore.h>
#include <numa.h>
#include <dml/dml.hpp>
#include "util/barrier.hpp"
#include "util/dml-helper.hpp"
#include "util/task-data.hpp"
#define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
#define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO
#define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << msg << std::endl; args->status = status; return nullptr; }}
#define ADD_TIMING_MESSUREMENT { if (i >= 5) { args->submit_duration.emplace_back(std::chrono::duration_cast<std::chrono::nanoseconds>(se - st).count()); args->complete_duration.emplace_back(std::chrono::duration_cast<std::chrono::nanoseconds>(et - se).count()); args->combined_duration.emplace_back(std::chrono::duration_cast<std::chrono::nanoseconds>(et - st).count());}}
template <typename path>
void* thread_function(void* argp) {
TaskData* args = reinterpret_cast<TaskData*>(argp);
// set numa node and core affinity of the current thread
numa_run_on_node(args->numa_node);
// allocate memory for the move operation on the requested numa nodes
void* src = numa_alloc_onnode(args->size, args->nnode_src);
void* dst = numa_alloc_onnode(args->size, args->nnode_dst);
dml::data_view srcv = dml::make_view(reinterpret_cast<uint8_t*>(src), args->size);
dml::data_view dstv = dml::make_view(reinterpret_cast<uint8_t*>(dst), args->size);
std::memset(src, 0, args->size);
std::memset(dst, 0, args->size);
args->status = dml::status_code::ok;
args->rep_completed = 0;
std::chrono::time_point<std::chrono::steady_clock> tps;
// we add 5 as the first 5 iterations will not be meassured
// to remove exceptional values encountered during warmup
for (uint32_t i = 0; i < args->rep_count + 5; i++) {
// synchronize the start of each iteration
// using the barrier structure
args->barrier_->wait();
if (args->batch_submit) {
const auto st = std::chrono::steady_clock::now();
auto sequence = dml::sequence(args->batch_size, std::allocator<dml::byte_t>());
for (uint32_t j = 0; j < args->batch_size; j++) {
// block_on_fault() is required to submit the task in a way so that the
// DSA engine can handle page faults itself together with the IOMMU which
// requires the WQ to be configured to allow this too
const auto status = sequence.add(dml::mem_copy.block_on_fault(), srcv, dstv);
CHECK_STATUS(status, "Adding operation to batch failed!");
}
// we use the asynchronous submit-routine even though this is not required
// here, however the project later on will only use async operation and
// therefore this behaviour should be benchmarked
auto handler = dml::submit<path>(dml::batch, sequence);
const auto se = std::chrono::steady_clock::now();
auto result = handler.get();
const auto et = std::chrono::steady_clock::now();
const dml::status_code status = result.status;
CHECK_STATUS(status, "Batch completed with an Error!");
ADD_TIMING_MESSUREMENT;
}
else if (args->batch_size > 1) {
// implementation for non-batched batch submit follows here
// this means we submit a bunch of work as single descriptors
// but then dont wait for the completion immediately
std::vector<dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>> handlers;
const auto st = std::chrono::steady_clock::now();
for (uint32_t j = 0; j < args->batch_size; j++) {
// block_on_fault() is required to submit the task in a way so that the
// DSA engine can handle page faults itself together with the IOMMU which
// requires the WQ to be configured to allow this too
handlers.emplace_back(dml::submit<path>(dml::mem_copy.block_on_fault(), srcv, dstv));
}
const auto se = std::chrono::steady_clock::now();
for (auto& handler : handlers) {
auto result = handler.get();
const dml::status_code status = result.status;
CHECK_STATUS(status, "Operation completed with an Error!");
}
const auto et = std::chrono::steady_clock::now();
ADD_TIMING_MESSUREMENT;
}
else {
const auto st = std::chrono::steady_clock::now();
// we use the asynchronous submit-routine even though this is not required
// here, however the project later on will only use async operation and
// therefore this behaviour should be benchmarked
// block_on_fault() is required to submit the task in a way so that the
// DSA engine can handle page faults itself together with the IOMMU which
// requires the WQ to be configured to allow this too
auto handler = dml::submit<path>(dml::mem_copy.block_on_fault(), srcv, dstv);
const auto se = std::chrono::steady_clock::now();
auto result = handler.get();
const auto et = std::chrono::steady_clock::now();
const dml::status_code status = result.status;
CHECK_STATUS(status, "Operation completed with an Error!");
ADD_TIMING_MESSUREMENT;
}
// again: we do not count the first 5 repetitions
if (i == 5) tps = std::chrono::steady_clock::now();
if (i >= 5) args->rep_completed++;
}
const auto tpe = std::chrono::steady_clock::now();
args->total_time = std::chrono::duration_cast<std::chrono::nanoseconds>(tpe - tps).count();
// free the allocated memory regions on the selected nodes
numa_free(src, args->size);
numa_free(dst, args->size);
return nullptr;
}
template <typename path>
void execute_dml_memcpy(std::vector<TaskData>& args) {
barrier task_barrier(args.size());
std::vector<pthread_t> threads;
// initialize numa library
numa_available();
// for each submitted task we link the semaphore
// and create the thread, passing the argument
for (auto& arg : args) {
arg.barrier_ = &task_barrier;
threads.emplace_back();
if (pthread_create(&threads.back(), nullptr, thread_function<path>, &arg) != 0) {
std::cerr << "Error creating thread" << std::endl;
exit(1);
}
}
for (pthread_t& t : threads) {
pthread_join(t, nullptr);
}
}

80
offloading-cacher/main.cpp

@ -1,42 +1,64 @@
#include <dml/dml.hpp>
#include <vector>
#include <iostream> #include <iostream>
#include <fstream>
#include <random>
#include "benchmark.hpp"
#include "offloading-cache.hpp"
int main(int argc, char **argv) {
if (argc < 3) {
std::cout << "Missing input and output file names." << std::endl;
std::cout << "Usage: ./benchmarks [input.json] [output.json]" << std::endl;
return 1;
}
double* GetRandomArray(const size_t size) {
double* array = new double[size];
const std::string input = argv[1];
const std::string output = argv[2];
std::uniform_real_distribution<double> unif(std::numeric_limits<double>::min(), std::numeric_limits<double>::max());
std::default_random_engine re;
std::string path;
std::vector<TaskData> args;
for (size_t i = 0; i < size; i++) {
array[i] = unif(re);
}
std::ifstream is(input);
ReadWorkDescription(args, path, is);
is.close();
return array;
}
if (path == "hw") {
execute_dml_memcpy<dml::hardware>(args);
bool IsEqual(const double* a, const double* b, const size_t size) {
for (size_t i = 0; i < size; i++) {
try {
if (a[i] != b[i]) return false;
} }
else if (path == "sw") {
execute_dml_memcpy<dml::software>(args);
catch (...) {
return false;
} }
else if (path == "auto") {
execute_dml_memcpy<dml::automatic>(args);
} }
else {
std::cerr << "Path is neither hw/sw/auto." << std::endl;
return true;
} }
std::ofstream os(output);
WriteResultLog(args, path, os);
os.close();
int main(int argc, char **argv) {
offcache::Cache cache;
auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node;
};
auto copy_policy = [](const int numa_dst_node, const int numa_src_node) {
return std::vector{ numa_src_node };
};
cache.Init(cache_policy,copy_policy);
static constexpr size_t data_size = 8192;
double* data = GetRandomArray(data_size);
std::unique_ptr<offcache::CacheData> data_cache = cache.Access(reinterpret_cast<uint8_t *>(data), data_size * sizeof(double), offcache::ExecutionPolicy::Relaxed);
data_cache->WaitOnCompletion();
double* cached = reinterpret_cast<double *>(data_cache->GetDataLocation());
if (data == cached) {
std::cout << "Caching did not affect data location." << std::endl;
}
if (IsEqual(data,cached,data_size)) {
std::cout << "Cached data is correct." << std::endl;
}
else {
std::cout << "Cached data is wrong." << std::endl;
}
} }

288
offloading-cacher/offloading-cache.hpp

@ -1,15 +1,20 @@
#pragma once #pragma once
#include <iostream>
#include <atomic> #include <atomic>
#include <vector> #include <vector>
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#include <shared_mutex> #include <shared_mutex>
#include <mutex>
#include <memory>
#include <semaphore.h> #include <semaphore.h>
#include <sched.h> #include <sched.h>
#include <numa.h> #include <numa.h>
#include <numaif.h>
#include <dml/dml.hpp> #include <dml/dml.hpp>
@ -30,6 +35,8 @@ namespace offcache {
Relaxed, Immediate, ImmediateNoCache Relaxed, Immediate, ImmediateNoCache
}; };
class Cache;
// the cache task structure will be used to submit and // the cache task structure will be used to submit and
// control a cache element, while providing source pointer // control a cache element, while providing source pointer
// and size in bytes for submission // and size in bytes for submission
@ -37,19 +44,41 @@ namespace offcache {
// then the submitting thread may wait on the atomic "result" // then the submitting thread may wait on the atomic "result"
// which will be notified by the cache worker upon processing // which will be notified by the cache worker upon processing
// after which the atomic-bool-ptr active will also become valid // after which the atomic-bool-ptr active will also become valid
struct CacheTask {
uint8_t* data_;
class CacheData {
public:
using dml_handler = dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>;
private:
uint8_t* src_;
size_t size_; size_t size_;
uint8_t* result_ = nullptr;
uint8_t* maybe_result_ = nullptr;
std::atomic<bool> active_ { true };
std::atomic<bool> valid_ { false };
std::vector<dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>> handlers_;
std::atomic<int32_t>* active_;
protected:
std::atomic<uint8_t*>* cache_;
uint8_t* incomplete_cache_;
std::unique_ptr<std::vector<dml_handler>> handlers_;
friend Cache;
public:
CacheData(uint8_t* data, const size_t size);
CacheData(const CacheData& other);
~CacheData();
void Deallocate();
void WaitOnCompletion();
uint8_t* GetDataLocation() const;
bool Active() const;
}; };
// singleton which holds the cache workers // singleton which holds the cache workers
// and is the place where work will be submited // and is the place where work will be submited
class CacheCoordinator {
class Cache {
public: public:
// cache policy is defined as a type here to allow flexible usage of the cacher // cache policy is defined as a type here to allow flexible usage of the cacher
// given a numa destination node (where the data will be needed), the numa source // given a numa destination node (where the data will be needed), the numa source
@ -67,18 +96,14 @@ namespace offcache {
private: private:
std::shared_mutex cache_mutex_; std::shared_mutex cache_mutex_;
std::unordered_map<uint8_t*, CacheTask*> cache_state_;
std::unordered_map<uint8_t*, CacheData> cache_state_;
CachePolicy* cache_policy_function_ = nullptr; CachePolicy* cache_policy_function_ = nullptr;
CopyPolicy* copy_policy_function_ = nullptr; CopyPolicy* copy_policy_function_ = nullptr;
dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) const; dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) const;
void SubmitTask(CacheTask* task);
CacheTask* CreateTask(const uint8_t *data, const size_t size) const;
void DestroyTask(CacheTask* task) const;
void SubmitTask(CacheData* task);
public: public:
void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function);
@ -88,33 +113,23 @@ namespace offcache {
// Immediate and ImmediateNoCache return a cache task // Immediate and ImmediateNoCache return a cache task
// with guaranteed-valid result value where Relaxed // with guaranteed-valid result value where Relaxed
// policy does not come with this guarantee. // policy does not come with this guarantee.
CacheTask* Access(uint8_t* data, const size_t size, const ExecutionPolicy policy);
// waits upon completion of caching
static void WaitOnCompletion(CacheTask* task);
// invalidates the given pointer
// afterwards the reference to the
// cache task object may be forgotten
static void SignalDataUnused(CacheTask* task);
// returns the location of the cached data
// which may or may not be valid
static uint8_t* GetDataLocation(CacheTask* task);
std::unique_ptr<CacheData> Access(uint8_t* data, const size_t size, const ExecutionPolicy policy);
void Flush(); void Flush();
}; };
} }
inline void offcache::CacheCoordinator::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) {
inline void offcache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) {
cache_policy_function_ = cache_policy_function; cache_policy_function_ = cache_policy_function;
copy_policy_function_ = copy_policy_function; copy_policy_function_ = copy_policy_function;
// initialize numa library // initialize numa library
numa_available(); numa_available();
std::cout << "[-] Cache Initialized" << std::endl;
} }
inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, const size_t size, const ExecutionPolicy policy) {
inline std::unique_ptr<offcache::CacheData> offcache::Cache::Access(uint8_t* data, const size_t size, const ExecutionPolicy policy) {
// the best situation is if this data is already cached // the best situation is if this data is already cached
// which we check in an unnamed block in which the cache // which we check in an unnamed block in which the cache
// is locked for reading to prevent another thread // is locked for reading to prevent another thread
@ -126,13 +141,16 @@ inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, co
const auto search = cache_state_.find(data); const auto search = cache_state_.find(data);
if (search != cache_state_.end()) { if (search != cache_state_.end()) {
if (search->second->size_ == size) {
search->second->active_.store(true);
// TODO: check for completed status depending on execution policy
return search->second;
if (search->second.size_ == size) {
search->second.active_->store(true);
std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)data << std::dec << std::endl;
return std::move(std::make_unique<CacheData>(search->second));
} }
else { else {
DestroyTask(search->second);
std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)data << std::dec << std::endl;
cache_state_.erase(search); cache_state_.erase(search);
} }
} }
@ -141,7 +159,7 @@ inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, co
// at this point the requested data is not present in cache // at this point the requested data is not present in cache
// and we create a caching task for it // and we create a caching task for it
CacheTask* task = CreateTask(data, size);
auto task = std::make_unique<CacheData>(data, size);
if (policy == ExecutionPolicy::Immediate) { if (policy == ExecutionPolicy::Immediate) {
// in intermediate mode the returned task // in intermediate mode the returned task
@ -150,9 +168,9 @@ inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, co
// after which we submit the task // after which we submit the task
// maybe_result is then set by submit // maybe_result is then set by submit
task->result_ = data;
SubmitTask(task);
return task;
task->cache_->store(data);
SubmitTask(task.get());
return std::move(task);
} }
else if (policy == ExecutionPolicy::ImmediateNoCache) { else if (policy == ExecutionPolicy::ImmediateNoCache) {
// for immediatenocache we just validate // for immediatenocache we just validate
@ -160,9 +178,9 @@ inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, co
// we must also set maybe_result in case // we must also set maybe_result in case
// someone waits on this // someone waits on this
task->result_ = data;
task->maybe_result_ = data;
return task;
task->cache_->store(data);
task->incomplete_cache_ = data;
return std::move(task);
} }
else if (policy == ExecutionPolicy::Relaxed) { else if (policy == ExecutionPolicy::Relaxed) {
// for relaxed no valid task must be returned // for relaxed no valid task must be returned
@ -170,15 +188,15 @@ inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, co
// the possible invalid task back with only // the possible invalid task back with only
// maybe_result set by submission // maybe_result set by submission
SubmitTask(task);
return task;
SubmitTask(task.get());
return std::move(task);
} }
else { else {
// this should not be reached // this should not be reached
} }
} }
inline void offcache::CacheCoordinator::SubmitTask(CacheTask* task) {
inline void offcache::Cache::SubmitTask(CacheData* task) {
// obtain numa node of current thread to determine where the data is needed // obtain numa node of current thread to determine where the data is needed
const int current_cpu = sched_getcpu(); const int current_cpu = sched_getcpu();
@ -187,42 +205,72 @@ inline void offcache::CacheCoordinator::SubmitTask(CacheTask* task) {
// obtain node that the given data pointer is allocated on // obtain node that the given data pointer is allocated on
int data_node = -1; int data_node = -1;
get_mempolicy(&data_node, NULL, 0, (void*)task->data_, MPOL_F_NODE | MPOL_F_ADDR);
get_mempolicy(&data_node, NULL, 0, (void*)task->src_, MPOL_F_NODE | MPOL_F_ADDR);
// querry cache policy function for the destination numa node // querry cache policy function for the destination numa node
const uint32_t dst_node = cache_policy_function_(current_node, data_node, task->size_);
const int dst_node = cache_policy_function_(current_node, data_node, task->size_);
std::cout << "[+] Allocating " << task->size_ << "B on node " << dst_node << " for " << std::hex << (uint64_t)task->src_ << std::dec << std::endl;
// allocate data on this node and flush the unused parts of the // allocate data on this node and flush the unused parts of the
// cache if the operation fails and retry once // cache if the operation fails and retry once
// TODO: smarter flush strategy could keep some stuff cached // TODO: smarter flush strategy could keep some stuff cached
uint8_t* dst = numa_alloc_onnode(task->size_, dst_node);
uint8_t* dst = reinterpret_cast<uint8_t*>(numa_alloc_onnode(task->size_, dst_node));
if (dst == nullptr) { if (dst == nullptr) {
std::cout << "[!] First allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl;
Flush(); Flush();
dst = numa_alloc_onnode(task->size_, dst_node);
dst = reinterpret_cast<uint8_t*>(numa_alloc_onnode(task->size_, dst_node));
if (dst == nullptr) { if (dst == nullptr) {
std::cout << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl;
return; return;
} }
} }
task->maybe_result_ = dst;
task->incomplete_cache_ = dst;
// querry copy policy function for the nodes to use for the copy // querry copy policy function for the nodes to use for the copy
const std::vector<int> executing_nodes = copy_policy_function_(dst_node, data_node); const std::vector<int> executing_nodes = copy_policy_function_(dst_node, data_node);
const size_t task_count = executing_nodes.size(); const size_t task_count = executing_nodes.size();
// at this point the task may be added to the cache structure
// due to the task being initialized with the valid flag set to false
// each task will copy one fair part of the total size
// and in case the total size is not a factor of the
// given task count the last node must copy the remainder
const size_t size = task->size_ / task_count;
const size_t last_size = size + task->size_ % task_count;
// save the current numa node mask to restore later
// as executing the copy task will place this thread
// on a different node
bitmask* nodemask = numa_get_run_node_mask();
for (uint32_t i = 0; i < task_count; i++) {
const size_t local_size = i + 1 == task_count ? size : last_size;
const size_t local_offset = i * size;
const uint8_t* local_src = task->src_ + local_offset;
uint8_t* local_dst = dst + local_offset;
task->handlers_->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i]));
}
// only at this point may the task be added to the control structure
// because adding it earlier could cause it to be returned for an
// access request while the handler-vector is not fully populated
// which could cause the wait-function to return prematurely
// TODO: this can be optimized because the abort is quite expensive
{ {
std::unique_lock<std::shared_mutex> lock(cache_mutex_); std::unique_lock<std::shared_mutex> lock(cache_mutex_);
const auto state = cache_state_.insert({task->data_, task});
const auto state = cache_state_.insert({task->src_, *task});
// if state.second is false then no insertion took place // if state.second is false then no insertion took place
// which means that concurrently whith this thread // which means that concurrently whith this thread
@ -231,94 +279,127 @@ inline void offcache::CacheCoordinator::SubmitTask(CacheTask* task) {
// TODO: abort is not the only way to handle this situation // TODO: abort is not the only way to handle this situation
if (!state.second) { if (!state.second) {
std::cout << "[x] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl;
// first wait on all copy operations to be completed
task->WaitOnCompletion();
// abort by doing the following steps // abort by doing the following steps
// (1) free the allocated memory, (2) remove the "maybe result" as // (1) free the allocated memory, (2) remove the "maybe result" as
// we will not run the caching operation, (3) clear the sub tasks // we will not run the caching operation, (3) clear the sub tasks
// for the very same reason, (4) set the result to the RAM-location // for the very same reason, (4) set the result to the RAM-location
numa_free(dst, task->size_); numa_free(dst, task->size_);
task->maybe_result_ = nullptr;
task->result_ = task->data_;
task->incomplete_cache_ = nullptr;
task->cache_->store(task->src_);
std::cout << "[-] Abort completed for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl;
return; return;
} }
} }
// each task will copy one fair part of the total size
// and in case the total size is not a factor of the
// given task count the last node must copy the remainder
const size_t size = task->size_ / task_count;
const size_t last_size = size + task->size_ % task_count;
// restore the previous nodemask
// save the current numa node mask to restore later
// as executing the copy task will place this thread
// on a different node
numa_run_on_node_mask(nodemask);
}
const int nodemask = numa_get_run_node_mask();
inline dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> offcache::Cache::ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) const {
dml::const_data_view srcv = dml::make_view(src, size);
dml::data_view dstv = dml::make_view(dst, size);
for (uint32_t i = 0; i < task_count; i++) {
const size_t local_size = i + 1 == task_count ? size : last_size;
const size_t local_offset = i * size;
const uint8_t* local_src = task->data_ + local_offset;
uint8_t* local_dst = dst + local_offset;
numa_run_on_node(node);
const auto handler = ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i]);
task->handlers_.emplace_back(handler);
return dml::submit<dml::automatic>(dml::mem_copy.block_on_fault(), srcv, dstv);
} }
// set the valid flag of the task as all handlers
// required for completion signal are registered
inline void offcache::CacheData::WaitOnCompletion() {
if (handlers_ == nullptr) {
std::cout << "[-] Waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
task->valid_.store(true);
task->valid_.notify_all();
cache_->wait(nullptr);
// restore the previous nodemask
std::cout << "[+] Finished waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
}
else {
std::cout << "[-] Waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
numa_run_on_node_mask(nodemask);
for (auto& handler : *handlers_) {
auto result = handler.get();
// TODO: handle the returned status code
} }
inline dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> offcache::CacheCoordinator::ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) {
dml::data_view srcv = dml::make_view(reinterpret_cast<uint8_t*>(src), size);
dml::data_view dstv = dml::make_view(reinterpret_cast<uint8_t*>(dst), size);
handlers_ = nullptr;
numa_run_on_node(node);
std::cout << "[+] Finished waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
return dml::submit<path>(dml::mem_copy.block_on_fault(), srcv, dstv);
cache_->store(incomplete_cache_);
cache_->notify_all();
}
} }
inline offcache::CacheTask* offcache::CacheCoordinator::CreateTask(const uint8_t* data, const size_t size) const {
CacheTask* task = new CacheTask();
task->data_ = data;
task->size_ = size;
return task;
offcache::CacheData::CacheData(uint8_t* data, const size_t size) {
std::cout << "[-] New CacheData 0x" << std::hex << (uint64_t)data << std::dec << std::endl;
src_ = data;
size_ = size;
active_ = new std::atomic<int32_t>();
cache_ = new std::atomic<uint8_t*>();
incomplete_cache_ = nullptr;
handlers_ = std::make_unique<std::vector<dml_handler>>();
} }
inline void offcache::CacheCoordinator::DestroyTask(CacheTask* task) const {
numa_free(task->result_, task->size_);
delete task;
offcache::CacheData::CacheData(const offcache::CacheData& other) {
std::cout << "[-] Copy Created for CacheData 0x" << std::hex << (uint64_t)other.src_ << std::dec << std::endl;
src_ = other.src_;
size_ = other.size_;
cache_ = other.cache_;
active_ = other.active_;
incomplete_cache_ = nullptr;
handlers_ = nullptr;
active_->fetch_add(1);
} }
inline void offcache::CacheCoordinator::WaitOnCompletion(CacheTask* task) {
task->valid_.wait(false);
offcache::CacheData::~CacheData() {
std::cout << "[-] Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
for (auto& handler : task->handlers_) {
auto result = handler.get();
// TODO: handle the returned status code
const int32_t v = active_->fetch_sub(1);
// if the returned value is non-positive
// then we must execute proper deletion
// as this was the last reference
if (v <= 0) {
std::cout << "[!] Full Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
Deallocate();
delete active_;
delete cache_;
}
} }
task->handlers_.clear();
void offcache::CacheData::Deallocate() {
std::cout << "[!] Deallocating for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
numa_free(cache_, size_);
cache_ = nullptr;
incomplete_cache_ = nullptr;
} }
inline uint8_t* offcache::CacheCoordinator::GetDataLocation(CacheTask* task) {
return task->result_;
uint8_t *offcache::CacheData::GetDataLocation() const {
return cache_->load();
} }
inline void offcache::CacheCoordinator::SignalDataUnused(CacheTask* task) {
task->active_.store(false);
bool offcache::CacheData::Active() const {
return active_->load() > 0;
} }
inline void offcache::CacheCoordinator::Flush() {
// TODO: there probably is a better way to implement this flush
inline void offcache::Cache::Flush() {
std::cout << "[-] Flushing Cache" << std::endl;
// TODO: there is a better way to implement this flush
{ {
std::unique_lock<std::shared_mutex> lock(cache_mutex_); std::unique_lock<std::shared_mutex> lock(cache_mutex_);
@ -326,8 +407,7 @@ inline void offcache::CacheCoordinator::Flush() {
auto it = cache_state_.begin(); auto it = cache_state_.begin();
while (it != cache_state_.end()) { while (it != cache_state_.end()) {
if (it->second->active_.load() == false) {
DestroyTask(it->second);
if (it->second.Active() == false) {
cache_state_.erase(it); cache_state_.erase(it);
it = cache_state_.begin(); it = cache_state_.begin();
} }

Loading…
Cancel
Save