From 623366433bfca316ab932fddd0c0cfcfff2c6f77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Tue, 9 Jan 2024 18:18:11 +0100 Subject: [PATCH] continue modifying the declarations for the cacher and providing some first definitions --- offloading-cacher/offloading-cache.hpp | 101 ++++++++++++++++++++----- 1 file changed, 82 insertions(+), 19 deletions(-) diff --git a/offloading-cacher/offloading-cache.hpp b/offloading-cacher/offloading-cache.hpp index 613d498..9c2967a 100644 --- a/offloading-cacher/offloading-cache.hpp +++ b/offloading-cacher/offloading-cache.hpp @@ -2,13 +2,39 @@ #include #include +#include #include +#include + #include #include namespace offcache { + // execution policy selects in which way the data is supposed to be cached + // and returned with the following behaviour is guaranteed in addition to the + // returned value being valid: + // Immediate: return as fast as possible + // may return cached data, can return data in RAM + // will trigger caching of the data provided + // ImmediateNoCache: return as fast as possible and never trigger caching + // same as Immediate but will not trigger caching + // Relaxed: no rapid return needed, take time + // will trigger caching and may only return + // once the caching is successful but can still + // provide data in RAM + enum class ExecutionPolicy { + Relaxed, Immediate, ImmediateNoCache + }; + + struct WorkerTask { + uint8_t* src_; + uint8_t* dst_; + size_t size_; + std::atomic completed_ { false }; + }; + // the cache task structure will be used to submit and // control a cache element, while providing source pointer // and size in bytes for submission @@ -16,35 +42,29 @@ namespace offcache { // then the submitting thread may wait on the atomic "result" // which will be notified by the cache worker upon processing // after which the atomic-bool-ptr active will also become valid - // - // the data pointed to by result and the bool-ptr are guaranteed - // to remain valid until the value pointed to by active is changed - // to false, after which the worker may clean up and delete the - // structure - carefull, do not call delete on this, the worker does struct CacheTask { uint8_t* data_; size_t size_; - std::atomic result_ { nullptr }; - std::atomic* active_; + ExecutionPolicy policy_; + uint8_t* result_; + std::atomic active_; + std::vector sub_tasks_; }; // worker class, one for each numa node // discovers its node configuration on startup // and keeps track of available memory class CacheWorker { - private: + public: uint8_t numa_node_ = 0; - std::unordered_map cache_info_; - - public: // this is the mailbox of the worker to which a new task // may be submitted by exchanging nullptr with a valid one // and notifying on the atomic after which ownership // of the CacheTask structure is transferred to the worker - std::atomic* task_slot_ = nullptr; + std::atomic* task_slot_ = nullptr; - static void run(CacheWorker* this_, const uint8_t numa_node); + static void run(CacheWorker* this_); }; // singleton which holds the cache workers @@ -64,11 +84,11 @@ namespace offcache { // or choosing a conservative usage policy typedef std::vector (CopyPolicy)(const uint8_t numa_dst_node, const uint8_t numa_src_node); - enum class ExecutionPolicy { - Immediate, Relaxed, NoCache - }; - private: + std::unordered_map workers_; + + std::unordered_map cache_state_; + CachePolicy* cache_policy_function_ = nullptr; CopyPolicy* copy_policy_function_ = nullptr; @@ -78,7 +98,50 @@ namespace offcache { // submits the given task and takes ownership of the pointer void SubmitTask(CacheTask* task, const ExecutionPolicy policy) const; - static void WaitOnCompletion(CacheTask* task); + // waits upon completion of caching + // returns the location of the data + static uint8_t* WaitOnCompletion(CacheTask* task); + + // invalidates the given pointer static void SignalDataUnused(CacheTask* task); }; -} \ No newline at end of file +} + +void offcache::CacheWorker::run(CacheWorker* this_) { + +} + +void offcache::CacheCoordinator::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { + cache_policy_function_ = cache_policy_function; + copy_policy_function_ = copy_policy_function; + + // initialize numa library + numa_available(); + + const uint8_t nodes_max = numa_num_configured_nodes(); + const uint8_t valid_nodes = numa_get_mems_allowed(); + + for (uint8_t node = 0; node < nodes_max; node++) { + if (numa_bitmask_isbitset(valid_nodes, node)) { + workers_.insert({ node, CacheWorker() }); + workers_[node].numa_node_ = node; + std::thread t (CacheWorker::run, &workers_[node]); + t.detach(); + } + } +} + +void offcache::CacheCoordinator::SubmitTask(CacheTask* task, const ExecutionPolicy policy) const { + +} + +uint8_t* offcache::CacheCoordinator::WaitOnCompletion(CacheTask* task) { + while (!task->sub_tasks_.empty()) { + task->sub_tasks_.back().completed_.wait(false); + task->sub_tasks_.pop_back(); + } +} + +void offcache::CacheCoordinator::SignalDataUnused(CacheTask* task) { + task->active_.store(false); +}