Browse Source

add project 'offloading cacher' and function declarations for the cacher in its header file

master
Constantin Fürst 12 months ago
parent
commit
f91cd3202f
  1. 2
      .gitignore
  2. 19
      offloading-cacher/CMakeLists.txt
  3. 174
      offloading-cacher/benchmark.hpp
  4. 43
      offloading-cacher/cmake/modules/FindNUMA.cmake
  5. 42
      offloading-cacher/main.cpp
  6. 84
      offloading-cacher/offloading-cache.hpp
  7. 26
      offloading-cacher/util/dml-helper.hpp

2
.gitignore

@ -13,6 +13,8 @@
*.fls *.fls
*/.vscode/* */.vscode/*
*/.idea/*
*/cmake-build-*/*
# ---> C++ # ---> C++
# Prerequisites # Prerequisites

19
offloading-cacher/CMakeLists.txt

@ -0,0 +1,19 @@
cmake_minimum_required(VERSION 3.18)
project(offloading-cacher)
set(CMAKE_CXX_STANDARD 20)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
find_package(NUMA REQUIRED)
set(DML_SOURCE_DIR "../../DML/include/")
set(SOURCES main.cpp)
add_executable(offloading-cacher ${SOURCES})
target_include_directories(offloading-cacher PRIVATE ${CMAKE_SOURCE_DIR} ${NUMA_INCLUDE_DIRS} ${DML_SOURCE_DIR})
target_link_libraries(offloading-cacher PRIVATE libdml.a pthread ${CMAKE_DL_LIBS} ${NUMA_LIBRARY})
install(TARGETS offloading-cacher DESTINATION ${CMAKE_INSTALL_PREFIX})

174
offloading-cacher/benchmark.hpp

@ -0,0 +1,174 @@
#pragma once
#include <iostream>
#include <vector>
#include <chrono>
#include <numeric>
#include <pthread.h>
#include <semaphore.h>
#include <numa.h>
#include <dml/dml.hpp>
#include "util/barrier.hpp"
#include "util/dml-helper.hpp"
#include "util/task-data.hpp"
#define LOG_CODE_INFO "Location: " << __FILE__ << "@" << __LINE__ << "::" << __FUNCTION__ << std::endl
#define LOG_ERR { pthread_t t = pthread_self(); std::cerr << "--- BEGIN ERROR MSG ---" << std::endl << "Physical: [Node " << args->numa_node << " | Thread " << t << "]" << std::endl; } std::cerr << LOG_CODE_INFO
#define CHECK_STATUS(status,msg) { if (status != dml::status_code::ok) { LOG_ERR << "Status Code: " << StatusCodeToString(status) << std::endl << msg << std::endl; args->status = status; return nullptr; }}
#define ADD_TIMING_MESSUREMENT { if (i >= 5) { args->submit_duration.emplace_back(std::chrono::duration_cast<std::chrono::nanoseconds>(se - st).count()); args->complete_duration.emplace_back(std::chrono::duration_cast<std::chrono::nanoseconds>(et - se).count()); args->combined_duration.emplace_back(std::chrono::duration_cast<std::chrono::nanoseconds>(et - st).count());}}
template <typename path>
void* thread_function(void* argp) {
TaskData* args = reinterpret_cast<TaskData*>(argp);
// set numa node and core affinity of the current thread
numa_run_on_node(args->numa_node);
// allocate memory for the move operation on the requested numa nodes
void* src = numa_alloc_onnode(args->size, args->nnode_src);
void* dst = numa_alloc_onnode(args->size, args->nnode_dst);
dml::data_view srcv = dml::make_view(reinterpret_cast<uint8_t*>(src), args->size);
dml::data_view dstv = dml::make_view(reinterpret_cast<uint8_t*>(dst), args->size);
std::memset(src, 0, args->size);
std::memset(dst, 0, args->size);
args->status = dml::status_code::ok;
args->rep_completed = 0;
std::chrono::time_point<std::chrono::steady_clock> tps;
// we add 5 as the first 5 iterations will not be meassured
// to remove exceptional values encountered during warmup
for (uint32_t i = 0; i < args->rep_count + 5; i++) {
// synchronize the start of each iteration
// using the barrier structure
args->barrier_->wait();
if (args->batch_submit) {
const auto st = std::chrono::steady_clock::now();
auto sequence = dml::sequence(args->batch_size, std::allocator<dml::byte_t>());
for (uint32_t j = 0; j < args->batch_size; j++) {
// block_on_fault() is required to submit the task in a way so that the
// DSA engine can handle page faults itself together with the IOMMU which
// requires the WQ to be configured to allow this too
const auto status = sequence.add(dml::mem_copy.block_on_fault(), srcv, dstv);
CHECK_STATUS(status, "Adding operation to batch failed!");
}
// we use the asynchronous submit-routine even though this is not required
// here, however the project later on will only use async operation and
// therefore this behaviour should be benchmarked
auto handler = dml::submit<path>(dml::batch, sequence);
const auto se = std::chrono::steady_clock::now();
auto result = handler.get();
const auto et = std::chrono::steady_clock::now();
const dml::status_code status = result.status;
CHECK_STATUS(status, "Batch completed with an Error!");
ADD_TIMING_MESSUREMENT;
}
else if (args->batch_size > 1) {
// implementation for non-batched batch submit follows here
// this means we submit a bunch of work as single descriptors
// but then dont wait for the completion immediately
std::vector<dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>> handlers;
const auto st = std::chrono::steady_clock::now();
for (uint32_t j = 0; j < args->batch_size; j++) {
// block_on_fault() is required to submit the task in a way so that the
// DSA engine can handle page faults itself together with the IOMMU which
// requires the WQ to be configured to allow this too
handlers.emplace_back(dml::submit<path>(dml::mem_copy.block_on_fault(), srcv, dstv));
}
const auto se = std::chrono::steady_clock::now();
for (auto& handler : handlers) {
auto result = handler.get();
const dml::status_code status = result.status;
CHECK_STATUS(status, "Operation completed with an Error!");
}
const auto et = std::chrono::steady_clock::now();
ADD_TIMING_MESSUREMENT;
}
else {
const auto st = std::chrono::steady_clock::now();
// we use the asynchronous submit-routine even though this is not required
// here, however the project later on will only use async operation and
// therefore this behaviour should be benchmarked
// block_on_fault() is required to submit the task in a way so that the
// DSA engine can handle page faults itself together with the IOMMU which
// requires the WQ to be configured to allow this too
auto handler = dml::submit<path>(dml::mem_copy.block_on_fault(), srcv, dstv);
const auto se = std::chrono::steady_clock::now();
auto result = handler.get();
const auto et = std::chrono::steady_clock::now();
const dml::status_code status = result.status;
CHECK_STATUS(status, "Operation completed with an Error!");
ADD_TIMING_MESSUREMENT;
}
// again: we do not count the first 5 repetitions
if (i == 5) tps = std::chrono::steady_clock::now();
if (i >= 5) args->rep_completed++;
}
const auto tpe = std::chrono::steady_clock::now();
args->total_time = std::chrono::duration_cast<std::chrono::nanoseconds>(tpe - tps).count();
// free the allocated memory regions on the selected nodes
numa_free(src, args->size);
numa_free(dst, args->size);
return nullptr;
}
template <typename path>
void execute_dml_memcpy(std::vector<TaskData>& args) {
barrier task_barrier(args.size());
std::vector<pthread_t> threads;
// initialize numa library
numa_available();
// for each submitted task we link the semaphore
// and create the thread, passing the argument
for (auto& arg : args) {
arg.barrier_ = &task_barrier;
threads.emplace_back();
if (pthread_create(&threads.back(), nullptr, thread_function<path>, &arg) != 0) {
std::cerr << "Error creating thread" << std::endl;
exit(1);
}
}
for (pthread_t& t : threads) {
pthread_join(t, nullptr);
}
}

43
offloading-cacher/cmake/modules/FindNUMA.cmake

@ -0,0 +1,43 @@
# Module for locating libnuma
#
# Read-only variables:
# NUMA_FOUND
# Indicates that the library has been found.
#
# NUMA_INCLUDE_DIR
# Points to the libnuma include directory.
#
# NUMA_LIBRARY_DIR
# Points to the directory that contains the libraries.
# The content of this variable can be passed to link_directories.
#
# NUMA_LIBRARY
# Points to the libnuma that can be passed to target_link_libararies.
#
# Copyright (c) 2013-2020 MulticoreWare, Inc
include(FindPackageHandleStandardArgs)
find_path(NUMA_ROOT_DIR
NAMES include/numa.h
PATHS ENV NUMA_ROOT
DOC "NUMA root directory")
find_path(NUMA_INCLUDE_DIR
NAMES numa.h
HINTS ${NUMA_ROOT_DIR}
PATH_SUFFIXES include
DOC "NUMA include directory")
find_library(NUMA_LIBRARY
NAMES numa
HINTS ${NUMA_ROOT_DIR}
DOC "NUMA library")
if (NUMA_LIBRARY)
get_filename_component(NUMA_LIBRARY_DIR ${NUMA_LIBRARY} PATH)
endif()
mark_as_advanced(NUMA_INCLUDE_DIR NUMA_LIBRARY_DIR NUMA_LIBRARY)
find_package_handle_standard_args(NUMA REQUIRED_VARS NUMA_ROOT_DIR NUMA_INCLUDE_DIR NUMA_LIBRARY)

42
offloading-cacher/main.cpp

@ -0,0 +1,42 @@
#include <dml/dml.hpp>
#include <vector>
#include <iostream>
#include <fstream>
#include "benchmark.hpp"
int main(int argc, char **argv) {
if (argc < 3) {
std::cout << "Missing input and output file names." << std::endl;
std::cout << "Usage: ./benchmarks [input.json] [output.json]" << std::endl;
return 1;
}
const std::string input = argv[1];
const std::string output = argv[2];
std::string path;
std::vector<TaskData> args;
std::ifstream is(input);
ReadWorkDescription(args, path, is);
is.close();
if (path == "hw") {
execute_dml_memcpy<dml::hardware>(args);
}
else if (path == "sw") {
execute_dml_memcpy<dml::software>(args);
}
else if (path == "auto") {
execute_dml_memcpy<dml::automatic>(args);
}
else {
std::cerr << "Path is neither hw/sw/auto." << std::endl;
}
std::ofstream os(output);
WriteResultLog(args, path, os);
os.close();
}

84
offloading-cacher/offloading-cache.hpp

@ -0,0 +1,84 @@
#pragma once
#include <atomic>
#include <vector>
#include <unordered_map>
#include <numa.h>
#include <dml/dml.hpp>
namespace offcache {
// the cache task structure will be used to submit and
// control a cache element, while providing source pointer
// and size in bytes for submission
//
// then the submitting thread may wait on the atomic "result"
// which will be notified by the cache worker upon processing
// after which the atomic-bool-ptr active will also become valid
//
// the data pointed to by result and the bool-ptr are guaranteed
// to remain valid until the value pointed to by active is changed
// to false, after which the worker may clean up and delete the
// structure - carefull, do not call delete on this, the worker does
struct CacheTask {
uint8_t* data_;
size_t size_;
std::atomic<uint8_t*> result_ { nullptr };
std::atomic<bool>* active_;
};
// worker class, one for each numa node
// discovers its node configuration on startup
// and keeps track of available memory
class CacheWorker {
private:
uint8_t numa_node_ = 0;
std::unordered_map<uint8_t*, CacheTask*> cache_info_;
public:
// this is the mailbox of the worker to which a new task
// may be submitted by exchanging nullptr with a valid one
// and notifying on the atomic after which ownership
// of the CacheTask structure is transferred to the worker
std::atomic<CacheTask*>* task_slot_ = nullptr;
static void run(CacheWorker* this_, const uint8_t numa_node);
};
// singleton which holds the cache workers
// and is the place where work will be submited
class CacheCoordinator {
public:
// cache policy is defined as a type here to allow flexible usage of the cacher
// given a numa destination node (where the data will be needed), the numa source
// node (current location of the data) and the data size, this function should
// return optimal cache placement
// dst node and returned value can differ if the system, for example, has HBM
// attached accessible directly to node n under a different node id m
typedef uint8_t (CachePolicy)(const uint8_t numa_dst_node, const uint8_t numa_src_node, const size_t data_size);
// copy policy specifies the copy-executing nodes for a given task
// which allows flexibility in assignment for optimizing raw throughput
// or choosing a conservative usage policy
typedef std::vector<uint8_t> (CopyPolicy)(const uint8_t numa_dst_node, const uint8_t numa_src_node);
enum class ExecutionPolicy {
Immediate, Relaxed, NoCache
};
private:
CachePolicy* cache_policy_function_ = nullptr;
CopyPolicy* copy_policy_function_ = nullptr;
public:
void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function);
// submits the given task and takes ownership of the pointer
void SubmitTask(CacheTask* task, const ExecutionPolicy policy) const;
static void WaitOnCompletion(CacheTask* task);
static void SignalDataUnused(CacheTask* task);
};
}

26
offloading-cacher/util/dml-helper.hpp

@ -0,0 +1,26 @@
#pragma once
#include <dml/dml.hpp>
inline const std::string StatusCodeToString(const dml::status_code code) {
switch(code) {
case dml::status_code::ok: return "ok";
case dml::status_code::false_predicate: return "false predicate";
case dml::status_code::partial_completion: return "partial completion";
case dml::status_code::nullptr_error: return "nullptr error";
case dml::status_code::bad_size: return "bad size";
case dml::status_code::bad_length: return "bad length";
case dml::status_code::inconsistent_size: return "inconsistent size";
case dml::status_code::dualcast_bad_padding: return "dualcast bad padding";
case dml::status_code::bad_alignment: return "bad alignment";
case dml::status_code::buffers_overlapping: return "buffers overlapping";
case dml::status_code::delta_delta_empty: return "delta delta empty";
case dml::status_code::batch_overflow: return "batch overflow";
case dml::status_code::execution_failed: return "execution failed";
case dml::status_code::unsupported_operation: return "unsupported operation";
case dml::status_code::queue_busy: return "queue busy";
case dml::status_code::error: return "unknown error";
case dml::status_code::config_error: return "config error";
default: return "unhandled error";
}
}
Loading…
Cancel
Save