diff --git a/.gitignore b/.gitignore index ab3553e..55c6836 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,8 @@ *.fls */.vscode/* +*/.idea/* +*/cmake-build-*/* # ---> C++ # Prerequisites diff --git a/offloading-cacher/CMakeLists.txt b/offloading-cacher/CMakeLists.txt new file mode 100755 index 0000000..7b4844a --- /dev/null +++ b/offloading-cacher/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.18) + +project(offloading-cacher) + +set(CMAKE_CXX_STANDARD 20) + +list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules") + +find_package(NUMA REQUIRED) + +set(DML_SOURCE_DIR "../../DML/include/") +set(SOURCES main.cpp) + +add_executable(offloading-cacher ${SOURCES}) + +target_include_directories(offloading-cacher PRIVATE ${CMAKE_SOURCE_DIR} ${NUMA_INCLUDE_DIRS} ${DML_SOURCE_DIR}) +target_link_libraries(offloading-cacher PRIVATE libdml.a pthread ${CMAKE_DL_LIBS} ${NUMA_LIBRARY}) + +install(TARGETS offloading-cacher DESTINATION ${CMAKE_INSTALL_PREFIX}) diff --git a/offloading-cacher/benchmark.hpp b/offloading-cacher/benchmark.hpp new file mode 100644 index 0000000..550efc2 --- /dev/null +++ b/offloading-cacher/benchmark.hpp @@ -0,0 +1,174 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include "util/barrier.hpp" +#include "util/dml-helper.hpp" +#include "util/task-data.hpp" + +#define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl +#define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO +#define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << msg << std::endl; args->status = status; return nullptr; }} + +#define ADD_TIMING_MESSUREMENT { if (i >= 5) { args->submit_duration.emplace_back(std::chrono::duration_cast(se - st).count()); args->complete_duration.emplace_back(std::chrono::duration_cast(et - se).count()); args->combined_duration.emplace_back(std::chrono::duration_cast(et - st).count());}} + +template +void* thread_function(void* argp) { + TaskData* args = reinterpret_cast(argp); + + // set numa node and core affinity of the current thread + numa_run_on_node(args->numa_node); + + // allocate memory for the move operation on the requested numa nodes + void* src = numa_alloc_onnode(args->size, args->nnode_src); + void* dst = numa_alloc_onnode(args->size, args->nnode_dst); + dml::data_view srcv = dml::make_view(reinterpret_cast(src), args->size); + dml::data_view dstv = dml::make_view(reinterpret_cast(dst), args->size); + + std::memset(src, 0, args->size); + std::memset(dst, 0, args->size); + + args->status = dml::status_code::ok; + args->rep_completed = 0; + + std::chrono::time_point tps; + + // we add 5 as the first 5 iterations will not be meassured + // to remove exceptional values encountered during warmup + for (uint32_t i = 0; i < args->rep_count + 5; i++) { + // synchronize the start of each iteration + // using the barrier structure + args->barrier_->wait(); + + if (args->batch_submit) { + const auto st = std::chrono::steady_clock::now(); + + auto sequence = dml::sequence(args->batch_size, std::allocator()); + + for (uint32_t j = 0; j < args->batch_size; j++) { + // block_on_fault() is required to submit the task in a way so that the + // DSA engine can handle page faults itself together with the IOMMU which + // requires the WQ to be configured to allow this too + + const auto status = sequence.add(dml::mem_copy.block_on_fault(), srcv, dstv); + CHECK_STATUS(status, "Adding operation to batch failed!"); + } + + // we use the asynchronous submit-routine even though this is not required + // here, however the project later on will only use async operation and + // therefore this behaviour should be benchmarked + + auto handler = dml::submit(dml::batch, sequence); + + const auto se = std::chrono::steady_clock::now(); + + auto result = handler.get(); + + const auto et = std::chrono::steady_clock::now(); + + const dml::status_code status = result.status; + CHECK_STATUS(status, "Batch completed with an Error!"); + + ADD_TIMING_MESSUREMENT; + } + else if (args->batch_size > 1) { + // implementation for non-batched batch submit follows here + // this means we submit a bunch of work as single descriptors + // but then dont wait for the completion immediately + + std::vector>> handlers; + + const auto st = std::chrono::steady_clock::now(); + + for (uint32_t j = 0; j < args->batch_size; j++) { + // block_on_fault() is required to submit the task in a way so that the + // DSA engine can handle page faults itself together with the IOMMU which + // requires the WQ to be configured to allow this too + + handlers.emplace_back(dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv)); + } + + const auto se = std::chrono::steady_clock::now(); + + for (auto& handler : handlers) { + auto result = handler.get(); + const dml::status_code status = result.status; + CHECK_STATUS(status, "Operation completed with an Error!"); + } + + const auto et = std::chrono::steady_clock::now(); + + ADD_TIMING_MESSUREMENT; + } + else { + const auto st = std::chrono::steady_clock::now(); + + // we use the asynchronous submit-routine even though this is not required + // here, however the project later on will only use async operation and + // therefore this behaviour should be benchmarked + // block_on_fault() is required to submit the task in a way so that the + // DSA engine can handle page faults itself together with the IOMMU which + // requires the WQ to be configured to allow this too + auto handler = dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); + + const auto se = std::chrono::steady_clock::now(); + + auto result = handler.get(); + + const auto et = std::chrono::steady_clock::now(); + + const dml::status_code status = result.status; + CHECK_STATUS(status, "Operation completed with an Error!"); + + ADD_TIMING_MESSUREMENT; + } + + // again: we do not count the first 5 repetitions + if (i == 5) tps = std::chrono::steady_clock::now(); + if (i >= 5) args->rep_completed++; + } + + const auto tpe = std::chrono::steady_clock::now(); + + args->total_time = std::chrono::duration_cast(tpe - tps).count(); + + // free the allocated memory regions on the selected nodes + numa_free(src, args->size); + numa_free(dst, args->size); + + return nullptr; +} + +template +void execute_dml_memcpy(std::vector& args) { + barrier task_barrier(args.size()); + std::vector threads; + + // initialize numa library + numa_available(); + + // for each submitted task we link the semaphore + // and create the thread, passing the argument + for (auto& arg : args) { + arg.barrier_ = &task_barrier; + threads.emplace_back(); + + if (pthread_create(&threads.back(), nullptr, thread_function, &arg) != 0) { + std::cerr << "Error creating thread" << std::endl; + exit(1); + } + } + + for (pthread_t& t : threads) { + pthread_join(t, nullptr); + } +} \ No newline at end of file diff --git a/offloading-cacher/cmake/modules/FindNUMA.cmake b/offloading-cacher/cmake/modules/FindNUMA.cmake new file mode 100644 index 0000000..94b23c8 --- /dev/null +++ b/offloading-cacher/cmake/modules/FindNUMA.cmake @@ -0,0 +1,43 @@ +# Module for locating libnuma +# +# Read-only variables: +# NUMA_FOUND +# Indicates that the library has been found. +# +# NUMA_INCLUDE_DIR +# Points to the libnuma include directory. +# +# NUMA_LIBRARY_DIR +# Points to the directory that contains the libraries. +# The content of this variable can be passed to link_directories. +# +# NUMA_LIBRARY +# Points to the libnuma that can be passed to target_link_libararies. +# +# Copyright (c) 2013-2020 MulticoreWare, Inc + +include(FindPackageHandleStandardArgs) + +find_path(NUMA_ROOT_DIR + NAMES include/numa.h + PATHS ENV NUMA_ROOT + DOC "NUMA root directory") + +find_path(NUMA_INCLUDE_DIR + NAMES numa.h + HINTS ${NUMA_ROOT_DIR} + PATH_SUFFIXES include + DOC "NUMA include directory") + +find_library(NUMA_LIBRARY + NAMES numa + HINTS ${NUMA_ROOT_DIR} + DOC "NUMA library") + +if (NUMA_LIBRARY) + get_filename_component(NUMA_LIBRARY_DIR ${NUMA_LIBRARY} PATH) +endif() + +mark_as_advanced(NUMA_INCLUDE_DIR NUMA_LIBRARY_DIR NUMA_LIBRARY) + +find_package_handle_standard_args(NUMA REQUIRED_VARS NUMA_ROOT_DIR NUMA_INCLUDE_DIR NUMA_LIBRARY) \ No newline at end of file diff --git a/offloading-cacher/main.cpp b/offloading-cacher/main.cpp new file mode 100644 index 0000000..f49f1f1 --- /dev/null +++ b/offloading-cacher/main.cpp @@ -0,0 +1,42 @@ +#include + +#include +#include +#include + +#include "benchmark.hpp" + +int main(int argc, char **argv) { + if (argc < 3) { + std::cout << "Missing input and output file names." << std::endl; + std::cout << "Usage: ./benchmarks [input.json] [output.json]" << std::endl; + return 1; + } + + const std::string input = argv[1]; + const std::string output = argv[2]; + + std::string path; + std::vector args; + + std::ifstream is(input); + ReadWorkDescription(args, path, is); + is.close(); + + if (path == "hw") { + execute_dml_memcpy(args); + } + else if (path == "sw") { + execute_dml_memcpy(args); + } + else if (path == "auto") { + execute_dml_memcpy(args); + } + else { + std::cerr << "Path is neither hw/sw/auto." << std::endl; + } + + std::ofstream os(output); + WriteResultLog(args, path, os); + os.close(); +} diff --git a/offloading-cacher/offloading-cache.hpp b/offloading-cacher/offloading-cache.hpp new file mode 100644 index 0000000..613d498 --- /dev/null +++ b/offloading-cacher/offloading-cache.hpp @@ -0,0 +1,84 @@ +#pragma once + +#include +#include +#include + +#include + +#include + +namespace offcache { + // the cache task structure will be used to submit and + // control a cache element, while providing source pointer + // and size in bytes for submission + // + // then the submitting thread may wait on the atomic "result" + // which will be notified by the cache worker upon processing + // after which the atomic-bool-ptr active will also become valid + // + // the data pointed to by result and the bool-ptr are guaranteed + // to remain valid until the value pointed to by active is changed + // to false, after which the worker may clean up and delete the + // structure - carefull, do not call delete on this, the worker does + struct CacheTask { + uint8_t* data_; + size_t size_; + std::atomic result_ { nullptr }; + std::atomic* active_; + }; + + // worker class, one for each numa node + // discovers its node configuration on startup + // and keeps track of available memory + class CacheWorker { + private: + uint8_t numa_node_ = 0; + + std::unordered_map cache_info_; + + public: + // this is the mailbox of the worker to which a new task + // may be submitted by exchanging nullptr with a valid one + // and notifying on the atomic after which ownership + // of the CacheTask structure is transferred to the worker + std::atomic* task_slot_ = nullptr; + + static void run(CacheWorker* this_, const uint8_t numa_node); + }; + + // singleton which holds the cache workers + // and is the place where work will be submited + class CacheCoordinator { + public: + // cache policy is defined as a type here to allow flexible usage of the cacher + // given a numa destination node (where the data will be needed), the numa source + // node (current location of the data) and the data size, this function should + // return optimal cache placement + // dst node and returned value can differ if the system, for example, has HBM + // attached accessible directly to node n under a different node id m + typedef uint8_t (CachePolicy)(const uint8_t numa_dst_node, const uint8_t numa_src_node, const size_t data_size); + + // copy policy specifies the copy-executing nodes for a given task + // which allows flexibility in assignment for optimizing raw throughput + // or choosing a conservative usage policy + typedef std::vector (CopyPolicy)(const uint8_t numa_dst_node, const uint8_t numa_src_node); + + enum class ExecutionPolicy { + Immediate, Relaxed, NoCache + }; + + private: + CachePolicy* cache_policy_function_ = nullptr; + CopyPolicy* copy_policy_function_ = nullptr; + + public: + void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); + + // submits the given task and takes ownership of the pointer + void SubmitTask(CacheTask* task, const ExecutionPolicy policy) const; + + static void WaitOnCompletion(CacheTask* task); + static void SignalDataUnused(CacheTask* task); + }; +} \ No newline at end of file diff --git a/offloading-cacher/util/dml-helper.hpp b/offloading-cacher/util/dml-helper.hpp new file mode 100644 index 0000000..1686fd1 --- /dev/null +++ b/offloading-cacher/util/dml-helper.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include + +inline const std::string StatusCodeToString(const dml::status_code code) { + switch(code) { + case dml::status_code::ok: return "ok"; + case dml::status_code::false_predicate: return "false predicate"; + case dml::status_code::partial_completion: return "partial completion"; + case dml::status_code::nullptr_error: return "nullptr error"; + case dml::status_code::bad_size: return "bad size"; + case dml::status_code::bad_length: return "bad length"; + case dml::status_code::inconsistent_size: return "inconsistent size"; + case dml::status_code::dualcast_bad_padding: return "dualcast bad padding"; + case dml::status_code::bad_alignment: return "bad alignment"; + case dml::status_code::buffers_overlapping: return "buffers overlapping"; + case dml::status_code::delta_delta_empty: return "delta delta empty"; + case dml::status_code::batch_overflow: return "batch overflow"; + case dml::status_code::execution_failed: return "execution failed"; + case dml::status_code::unsupported_operation: return "unsupported operation"; + case dml::status_code::queue_busy: return "queue busy"; + case dml::status_code::error: return "unknown error"; + case dml::status_code::config_error: return "config error"; + default: return "unhandled error"; + } +} \ No newline at end of file