From 5e30a370ceb12cf75d350e7075645ffff6cb8475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Tue, 9 Jan 2024 23:49:22 +0100 Subject: [PATCH] finish first stage of caching implementation and provide a rudimentary test function in the main --- offloading-cacher/benchmark.hpp | 174 --------------- offloading-cacher/main.cpp | 80 ++++--- offloading-cacher/offloading-cache.hpp | 290 ++++++++++++++++--------- 3 files changed, 236 insertions(+), 308 deletions(-) delete mode 100644 offloading-cacher/benchmark.hpp diff --git a/offloading-cacher/benchmark.hpp b/offloading-cacher/benchmark.hpp deleted file mode 100644 index 550efc2..0000000 --- a/offloading-cacher/benchmark.hpp +++ /dev/null @@ -1,174 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include -#include -#include - -#include - -#include "util/barrier.hpp" -#include "util/dml-helper.hpp" -#include "util/task-data.hpp" - -#define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl -#define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO -#define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << msg << std::endl; args->status = status; return nullptr; }} - -#define ADD_TIMING_MESSUREMENT { if (i >= 5) { args->submit_duration.emplace_back(std::chrono::duration_cast(se - st).count()); args->complete_duration.emplace_back(std::chrono::duration_cast(et - se).count()); args->combined_duration.emplace_back(std::chrono::duration_cast(et - st).count());}} - -template -void* thread_function(void* argp) { - TaskData* args = reinterpret_cast(argp); - - // set numa node and core affinity of the current thread - numa_run_on_node(args->numa_node); - - // allocate memory for the move operation on the requested numa nodes - void* src = numa_alloc_onnode(args->size, args->nnode_src); - void* dst = numa_alloc_onnode(args->size, args->nnode_dst); - dml::data_view srcv = dml::make_view(reinterpret_cast(src), args->size); - dml::data_view dstv = dml::make_view(reinterpret_cast(dst), args->size); - - std::memset(src, 0, args->size); - std::memset(dst, 0, args->size); - - args->status = dml::status_code::ok; - args->rep_completed = 0; - - std::chrono::time_point tps; - - // we add 5 as the first 5 iterations will not be meassured - // to remove exceptional values encountered during warmup - for (uint32_t i = 0; i < args->rep_count + 5; i++) { - // synchronize the start of each iteration - // using the barrier structure - args->barrier_->wait(); - - if (args->batch_submit) { - const auto st = std::chrono::steady_clock::now(); - - auto sequence = dml::sequence(args->batch_size, std::allocator()); - - for (uint32_t j = 0; j < args->batch_size; j++) { - // block_on_fault() is required to submit the task in a way so that the - // DSA engine can handle page faults itself together with the IOMMU which - // requires the WQ to be configured to allow this too - - const auto status = sequence.add(dml::mem_copy.block_on_fault(), srcv, dstv); - CHECK_STATUS(status, "Adding operation to batch failed!"); - } - - // we use the asynchronous submit-routine even though this is not required - // here, however the project later on will only use async operation and - // therefore this behaviour should be benchmarked - - auto handler = dml::submit(dml::batch, sequence); - - const auto se = std::chrono::steady_clock::now(); - - auto result = handler.get(); - - const auto et = std::chrono::steady_clock::now(); - - const dml::status_code status = result.status; - CHECK_STATUS(status, "Batch completed with an Error!"); - - ADD_TIMING_MESSUREMENT; - } - else if (args->batch_size > 1) { - // implementation for non-batched batch submit follows here - // this means we submit a bunch of work as single descriptors - // but then dont wait for the completion immediately - - std::vector>> handlers; - - const auto st = std::chrono::steady_clock::now(); - - for (uint32_t j = 0; j < args->batch_size; j++) { - // block_on_fault() is required to submit the task in a way so that the - // DSA engine can handle page faults itself together with the IOMMU which - // requires the WQ to be configured to allow this too - - handlers.emplace_back(dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv)); - } - - const auto se = std::chrono::steady_clock::now(); - - for (auto& handler : handlers) { - auto result = handler.get(); - const dml::status_code status = result.status; - CHECK_STATUS(status, "Operation completed with an Error!"); - } - - const auto et = std::chrono::steady_clock::now(); - - ADD_TIMING_MESSUREMENT; - } - else { - const auto st = std::chrono::steady_clock::now(); - - // we use the asynchronous submit-routine even though this is not required - // here, however the project later on will only use async operation and - // therefore this behaviour should be benchmarked - // block_on_fault() is required to submit the task in a way so that the - // DSA engine can handle page faults itself together with the IOMMU which - // requires the WQ to be configured to allow this too - auto handler = dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); - - const auto se = std::chrono::steady_clock::now(); - - auto result = handler.get(); - - const auto et = std::chrono::steady_clock::now(); - - const dml::status_code status = result.status; - CHECK_STATUS(status, "Operation completed with an Error!"); - - ADD_TIMING_MESSUREMENT; - } - - // again: we do not count the first 5 repetitions - if (i == 5) tps = std::chrono::steady_clock::now(); - if (i >= 5) args->rep_completed++; - } - - const auto tpe = std::chrono::steady_clock::now(); - - args->total_time = std::chrono::duration_cast(tpe - tps).count(); - - // free the allocated memory regions on the selected nodes - numa_free(src, args->size); - numa_free(dst, args->size); - - return nullptr; -} - -template -void execute_dml_memcpy(std::vector& args) { - barrier task_barrier(args.size()); - std::vector threads; - - // initialize numa library - numa_available(); - - // for each submitted task we link the semaphore - // and create the thread, passing the argument - for (auto& arg : args) { - arg.barrier_ = &task_barrier; - threads.emplace_back(); - - if (pthread_create(&threads.back(), nullptr, thread_function, &arg) != 0) { - std::cerr << "Error creating thread" << std::endl; - exit(1); - } - } - - for (pthread_t& t : threads) { - pthread_join(t, nullptr); - } -} \ No newline at end of file diff --git a/offloading-cacher/main.cpp b/offloading-cacher/main.cpp index f49f1f1..b6c9714 100644 --- a/offloading-cacher/main.cpp +++ b/offloading-cacher/main.cpp @@ -1,42 +1,64 @@ -#include - -#include #include -#include +#include -#include "benchmark.hpp" +#include "offloading-cache.hpp" -int main(int argc, char **argv) { - if (argc < 3) { - std::cout << "Missing input and output file names." << std::endl; - std::cout << "Usage: ./benchmarks [input.json] [output.json]" << std::endl; - return 1; - } +double* GetRandomArray(const size_t size) { + double* array = new double[size]; - const std::string input = argv[1]; - const std::string output = argv[2]; + std::uniform_real_distribution unif(std::numeric_limits::min(), std::numeric_limits::max()); + std::default_random_engine re; - std::string path; - std::vector args; + for (size_t i = 0; i < size; i++) { + array[i] = unif(re); + } - std::ifstream is(input); - ReadWorkDescription(args, path, is); - is.close(); + return array; +} - if (path == "hw") { - execute_dml_memcpy(args); +bool IsEqual(const double* a, const double* b, const size_t size) { + for (size_t i = 0; i < size; i++) { + try { + if (a[i] != b[i]) return false; + } + catch (...) { + return false; + } } - else if (path == "sw") { - execute_dml_memcpy(args); + + return true; +} + +int main(int argc, char **argv) { + offcache::Cache cache; + + auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) { + return numa_dst_node; + }; + + auto copy_policy = [](const int numa_dst_node, const int numa_src_node) { + return std::vector{ numa_src_node }; + }; + + cache.Init(cache_policy,copy_policy); + + static constexpr size_t data_size = 8192; + double* data = GetRandomArray(data_size); + + std::unique_ptr data_cache = cache.Access(reinterpret_cast(data), data_size * sizeof(double), offcache::ExecutionPolicy::Relaxed); + + data_cache->WaitOnCompletion(); + + double* cached = reinterpret_cast(data_cache->GetDataLocation()); + + if (data == cached) { + std::cout << "Caching did not affect data location." << std::endl; } - else if (path == "auto") { - execute_dml_memcpy(args); + + if (IsEqual(data,cached,data_size)) { + std::cout << "Cached data is correct." << std::endl; } else { - std::cerr << "Path is neither hw/sw/auto." << std::endl; + std::cout << "Cached data is wrong." << std::endl; } - - std::ofstream os(output); - WriteResultLog(args, path, os); - os.close(); } diff --git a/offloading-cacher/offloading-cache.hpp b/offloading-cacher/offloading-cache.hpp index d937fb8..f40ef3d 100644 --- a/offloading-cacher/offloading-cache.hpp +++ b/offloading-cacher/offloading-cache.hpp @@ -1,15 +1,20 @@ #pragma once +#include + #include #include #include #include #include +#include +#include #include #include #include +#include #include @@ -30,6 +35,8 @@ namespace offcache { Relaxed, Immediate, ImmediateNoCache }; + class Cache; + // the cache task structure will be used to submit and // control a cache element, while providing source pointer // and size in bytes for submission @@ -37,19 +44,41 @@ namespace offcache { // then the submitting thread may wait on the atomic "result" // which will be notified by the cache worker upon processing // after which the atomic-bool-ptr active will also become valid - struct CacheTask { - uint8_t* data_; + class CacheData { + public: + using dml_handler = dml::handler>; + + private: + uint8_t* src_; size_t size_; - uint8_t* result_ = nullptr; - uint8_t* maybe_result_ = nullptr; - std::atomic active_ { true }; - std::atomic valid_ { false }; - std::vector>> handlers_; + + std::atomic* active_; + + protected: + std::atomic* cache_; + + uint8_t* incomplete_cache_; + + std::unique_ptr> handlers_; + + friend Cache; + + public: + CacheData(uint8_t* data, const size_t size); + CacheData(const CacheData& other); + ~CacheData(); + + void Deallocate(); + void WaitOnCompletion(); + + uint8_t* GetDataLocation() const; + + bool Active() const; }; // singleton which holds the cache workers // and is the place where work will be submited - class CacheCoordinator { + class Cache { public: // cache policy is defined as a type here to allow flexible usage of the cacher // given a numa destination node (where the data will be needed), the numa source @@ -67,18 +96,14 @@ namespace offcache { private: std::shared_mutex cache_mutex_; - std::unordered_map cache_state_; + std::unordered_map cache_state_; CachePolicy* cache_policy_function_ = nullptr; CopyPolicy* copy_policy_function_ = nullptr; dml::handler> ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) const; - void SubmitTask(CacheTask* task); - - CacheTask* CreateTask(const uint8_t *data, const size_t size) const; - - void DestroyTask(CacheTask* task) const; + void SubmitTask(CacheData* task); public: void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); @@ -88,33 +113,23 @@ namespace offcache { // Immediate and ImmediateNoCache return a cache task // with guaranteed-valid result value where Relaxed // policy does not come with this guarantee. - CacheTask* Access(uint8_t* data, const size_t size, const ExecutionPolicy policy); - - // waits upon completion of caching - static void WaitOnCompletion(CacheTask* task); - - // invalidates the given pointer - // afterwards the reference to the - // cache task object may be forgotten - static void SignalDataUnused(CacheTask* task); - - // returns the location of the cached data - // which may or may not be valid - static uint8_t* GetDataLocation(CacheTask* task); + std::unique_ptr Access(uint8_t* data, const size_t size, const ExecutionPolicy policy); void Flush(); }; } -inline void offcache::CacheCoordinator::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { +inline void offcache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { cache_policy_function_ = cache_policy_function; copy_policy_function_ = copy_policy_function; // initialize numa library numa_available(); + + std::cout << "[-] Cache Initialized" << std::endl; } -inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, const size_t size, const ExecutionPolicy policy) { +inline std::unique_ptr offcache::Cache::Access(uint8_t* data, const size_t size, const ExecutionPolicy policy) { // the best situation is if this data is already cached // which we check in an unnamed block in which the cache // is locked for reading to prevent another thread @@ -126,13 +141,16 @@ inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, co const auto search = cache_state_.find(data); if (search != cache_state_.end()) { - if (search->second->size_ == size) { - search->second->active_.store(true); - // TODO: check for completed status depending on execution policy - return search->second; + if (search->second.size_ == size) { + search->second.active_->store(true); + + std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)data << std::dec << std::endl; + + return std::move(std::make_unique(search->second)); } else { - DestroyTask(search->second); + std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)data << std::dec << std::endl; + cache_state_.erase(search); } } @@ -141,7 +159,7 @@ inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, co // at this point the requested data is not present in cache // and we create a caching task for it - CacheTask* task = CreateTask(data, size); + auto task = std::make_unique(data, size); if (policy == ExecutionPolicy::Immediate) { // in intermediate mode the returned task @@ -150,9 +168,9 @@ inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, co // after which we submit the task // maybe_result is then set by submit - task->result_ = data; - SubmitTask(task); - return task; + task->cache_->store(data); + SubmitTask(task.get()); + return std::move(task); } else if (policy == ExecutionPolicy::ImmediateNoCache) { // for immediatenocache we just validate @@ -160,9 +178,9 @@ inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, co // we must also set maybe_result in case // someone waits on this - task->result_ = data; - task->maybe_result_ = data; - return task; + task->cache_->store(data); + task->incomplete_cache_ = data; + return std::move(task); } else if (policy == ExecutionPolicy::Relaxed) { // for relaxed no valid task must be returned @@ -170,15 +188,15 @@ inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, co // the possible invalid task back with only // maybe_result set by submission - SubmitTask(task); - return task; + SubmitTask(task.get()); + return std::move(task); } else { // this should not be reached } } -inline void offcache::CacheCoordinator::SubmitTask(CacheTask* task) { +inline void offcache::Cache::SubmitTask(CacheData* task) { // obtain numa node of current thread to determine where the data is needed const int current_cpu = sched_getcpu(); @@ -187,42 +205,72 @@ inline void offcache::CacheCoordinator::SubmitTask(CacheTask* task) { // obtain node that the given data pointer is allocated on int data_node = -1; - get_mempolicy(&data_node, NULL, 0, (void*)task->data_, MPOL_F_NODE | MPOL_F_ADDR); + get_mempolicy(&data_node, NULL, 0, (void*)task->src_, MPOL_F_NODE | MPOL_F_ADDR); // querry cache policy function for the destination numa node - const uint32_t dst_node = cache_policy_function_(current_node, data_node, task->size_); + const int dst_node = cache_policy_function_(current_node, data_node, task->size_); + + std::cout << "[+] Allocating " << task->size_ << "B on node " << dst_node << " for " << std::hex << (uint64_t)task->src_ << std::dec << std::endl; // allocate data on this node and flush the unused parts of the // cache if the operation fails and retry once // TODO: smarter flush strategy could keep some stuff cached - uint8_t* dst = numa_alloc_onnode(task->size_, dst_node); + uint8_t* dst = reinterpret_cast(numa_alloc_onnode(task->size_, dst_node)); if (dst == nullptr) { + std::cout << "[!] First allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; + Flush(); - dst = numa_alloc_onnode(task->size_, dst_node); + dst = reinterpret_cast(numa_alloc_onnode(task->size_, dst_node)); if (dst == nullptr) { + std::cout << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; return; } } - task->maybe_result_ = dst; + task->incomplete_cache_ = dst; // querry copy policy function for the nodes to use for the copy const std::vector executing_nodes = copy_policy_function_(dst_node, data_node); const size_t task_count = executing_nodes.size(); - // at this point the task may be added to the cache structure - // due to the task being initialized with the valid flag set to false + // each task will copy one fair part of the total size + // and in case the total size is not a factor of the + // given task count the last node must copy the remainder + + const size_t size = task->size_ / task_count; + const size_t last_size = size + task->size_ % task_count; + + // save the current numa node mask to restore later + // as executing the copy task will place this thread + // on a different node + + bitmask* nodemask = numa_get_run_node_mask(); + + for (uint32_t i = 0; i < task_count; i++) { + const size_t local_size = i + 1 == task_count ? size : last_size; + const size_t local_offset = i * size; + const uint8_t* local_src = task->src_ + local_offset; + uint8_t* local_dst = dst + local_offset; + + task->handlers_->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); + } + + // only at this point may the task be added to the control structure + // because adding it earlier could cause it to be returned for an + // access request while the handler-vector is not fully populated + // which could cause the wait-function to return prematurely + // TODO: this can be optimized because the abort is quite expensive { std::unique_lock lock(cache_mutex_); - const auto state = cache_state_.insert({task->data_, task}); + const auto state = cache_state_.insert({task->src_, *task}); // if state.second is false then no insertion took place // which means that concurrently whith this thread @@ -231,94 +279,127 @@ inline void offcache::CacheCoordinator::SubmitTask(CacheTask* task) { // TODO: abort is not the only way to handle this situation if (!state.second) { + std::cout << "[x] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; + + // first wait on all copy operations to be completed + + task->WaitOnCompletion(); + // abort by doing the following steps // (1) free the allocated memory, (2) remove the "maybe result" as // we will not run the caching operation, (3) clear the sub tasks // for the very same reason, (4) set the result to the RAM-location numa_free(dst, task->size_); - task->maybe_result_ = nullptr; - task->result_ = task->data_; + task->incomplete_cache_ = nullptr; + task->cache_->store(task->src_); + + std::cout << "[-] Abort completed for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; + return; } } - // each task will copy one fair part of the total size - // and in case the total size is not a factor of the - // given task count the last node must copy the remainder + // restore the previous nodemask - const size_t size = task->size_ / task_count; - const size_t last_size = size + task->size_ % task_count; + numa_run_on_node_mask(nodemask); +} - // save the current numa node mask to restore later - // as executing the copy task will place this thread - // on a different node +inline dml::handler> offcache::Cache::ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) const { + dml::const_data_view srcv = dml::make_view(src, size); + dml::data_view dstv = dml::make_view(dst, size); - const int nodemask = numa_get_run_node_mask(); + numa_run_on_node(node); - for (uint32_t i = 0; i < task_count; i++) { - const size_t local_size = i + 1 == task_count ? size : last_size; - const size_t local_offset = i * size; - const uint8_t* local_src = task->data_ + local_offset; - uint8_t* local_dst = dst + local_offset; + return dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); +} + +inline void offcache::CacheData::WaitOnCompletion() { + if (handlers_ == nullptr) { + std::cout << "[-] Waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - const auto handler = ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i]); - task->handlers_.emplace_back(handler); + cache_->wait(nullptr); + + std::cout << "[+] Finished waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; } + else { + std::cout << "[-] Waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - // set the valid flag of the task as all handlers - // required for completion signal are registered + for (auto& handler : *handlers_) { + auto result = handler.get(); + // TODO: handle the returned status code + } - task->valid_.store(true); - task->valid_.notify_all(); + handlers_ = nullptr; - // restore the previous nodemask + std::cout << "[+] Finished waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - numa_run_on_node_mask(nodemask); + cache_->store(incomplete_cache_); + cache_->notify_all(); + } } -inline dml::handler> offcache::CacheCoordinator::ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) { - dml::data_view srcv = dml::make_view(reinterpret_cast(src), size); - dml::data_view dstv = dml::make_view(reinterpret_cast(dst), size); - - numa_run_on_node(node); +offcache::CacheData::CacheData(uint8_t* data, const size_t size) { + std::cout << "[-] New CacheData 0x" << std::hex << (uint64_t)data << std::dec << std::endl; - return dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); + src_ = data; + size_ = size; + active_ = new std::atomic(); + cache_ = new std::atomic(); + incomplete_cache_ = nullptr; + handlers_ = std::make_unique>(); } -inline offcache::CacheTask* offcache::CacheCoordinator::CreateTask(const uint8_t* data, const size_t size) const { - CacheTask* task = new CacheTask(); - task->data_ = data; - task->size_ = size; - return task; -} +offcache::CacheData::CacheData(const offcache::CacheData& other) { + std::cout << "[-] Copy Created for CacheData 0x" << std::hex << (uint64_t)other.src_ << std::dec << std::endl; -inline void offcache::CacheCoordinator::DestroyTask(CacheTask* task) const { - numa_free(task->result_, task->size_); - delete task; + src_ = other.src_; + size_ = other.size_; + cache_ = other.cache_; + active_ = other.active_; + incomplete_cache_ = nullptr; + handlers_ = nullptr; + active_->fetch_add(1); } -inline void offcache::CacheCoordinator::WaitOnCompletion(CacheTask* task) { - task->valid_.wait(false); +offcache::CacheData::~CacheData() { + std::cout << "[-] Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + + const int32_t v = active_->fetch_sub(1); - for (auto& handler : task->handlers_) { - auto result = handler.get(); - // TODO: handle the returned status code + // if the returned value is non-positive + // then we must execute proper deletion + // as this was the last reference + + if (v <= 0) { + std::cout << "[!] Full Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + + Deallocate(); + delete active_; + delete cache_; } +} - task->handlers_.clear(); +void offcache::CacheData::Deallocate() { + std::cout << "[!] Deallocating for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + + numa_free(cache_, size_); + cache_ = nullptr; + incomplete_cache_ = nullptr; } -inline uint8_t* offcache::CacheCoordinator::GetDataLocation(CacheTask* task) { - return task->result_; +uint8_t *offcache::CacheData::GetDataLocation() const { + return cache_->load(); } -inline void offcache::CacheCoordinator::SignalDataUnused(CacheTask* task) { - task->active_.store(false); +bool offcache::CacheData::Active() const { + return active_->load() > 0; } -inline void offcache::CacheCoordinator::Flush() { - // TODO: there probably is a better way to implement this flush +inline void offcache::Cache::Flush() { + std::cout << "[-] Flushing Cache" << std::endl; + + // TODO: there is a better way to implement this flush { std::unique_lock lock(cache_mutex_); @@ -326,8 +407,7 @@ inline void offcache::CacheCoordinator::Flush() { auto it = cache_state_.begin(); while (it != cache_state_.end()) { - if (it->second->active_.load() == false) { - DestroyTask(it->second); + if (it->second.Active() == false) { cache_state_.erase(it); it = cache_state_.begin(); }