gpt4 book ai didi

c++ - 使用 alignas 防止错误共享被破坏

转载 作者:行者123 更新时间:2023-12-03 06:50:22 26 4
gpt4 key购买 nike

我不习惯在互联网上发布任何问题,所以如果我做错了什么,请告诉我。
简而言之

  • 如何在 CPU 缓存行大小为 64 字节的 64 位架构上正确防止错误共享?
  • C++ 'alignas' 关键字和简单字节数组(例如:char[64])的使用如何影响多线程效率?

  • 语境
    在致力于非常有效地实现 Single Consumer Single Producer Queue 的同时,我在对我的代码进行基准测试时遇到了 GCC 编译器的不合逻辑行为。
    全文
    我希望有人有必要的知识来解释正在发生的事情。
    我目前在 arch linux 上使用 GCC 10.2.0 及其 C++ 20 实现。我的笔记本电脑是带有 i7-7500U 处理器的联想 T470S。
    让我从数据结构开始:
    class SPSCQueue
    {
    public:
    ...

    private:
    alignas(64) std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
    std::size_t _headCache { 0 }; // Head cache for the producer
    char _pad0[64 - sizeof(Buffer) - sizeof(std::size_t)]; // 64 bytes alignment padding

    alignas(64) std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer2
    std::size_t _tailCache { 0 }; // Head cache for the consumer
    char _pad1[64 - sizeof(Buffer) - sizeof(std::size_t)]; // 64 bytes alignment padding
    };
    以下数据结构在我的系统上推送/弹出时获得了快速而稳定的 20ns。
    然而, 只有使用以下成员更改对齐方式会使基准不稳定并在 20 到 30ns 之间。
        alignas(64) std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    struct alignas(64) {
    Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
    std::size_t _headCache { 0 }; // Head cache for the producer
    };

    alignas(64) std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    struct alignas(64) {
    Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer1
    std::size_t _tailCache { 0 }; // Tail cache for the consumer
    };
    最后,当我尝试这个配置给我 40 到 55ns 的结果时,我迷失了更多。

    std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    char _pad0[64 - sizeof(std::atomic<size_t>)];
    Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
    std::size_t _headCache { 0 }; // Head cache for the producer
    char _pad1[64 - sizeof(Buffer) - sizeof(std::size_t)];

    std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    char _pad2[64 - sizeof(std::atomic<size_t>)];
    Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer2
    std::size_t _tailCache { 0 }; // Head cache for the consumer
    char _pad3[64 - sizeof(Buffer) - sizeof(std::size_t)];
    这次我让队列推送/弹出在 40 到 55ns 之间振荡。
    在这一点上我很迷茫,因为我不知道我应该去哪里寻找答案。到目前为止,C++ 内存布局对我来说非常直观,但我意识到我仍然错过了在高频多线程方面做得更好的非常重要的知识。
    最少的代码示例
    如果你想编译整个代码来自己测试,这里需要几个文件:
    SPSCQueue.hpp:

    #pragma once

    #include <atomic>
    #include <cstdlib>
    #include <cinttypes>

    #define KF_ALIGN_CACHELINE alignas(kF::Core::Utils::CacheLineSize)

    namespace kF::Core
    {
    template<typename Type>
    class SPSCQueue;

    namespace Utils
    {
    /** @brief Helper used to perfect forward move / copy constructor */
    template<typename Type, bool ForceCopy = false>
    void ForwardConstruct(Type *dest, Type *source) {
    if constexpr (!ForceCopy && std::is_move_assignable_v<Type>)
    new (dest) Type(std::move(*source));
    else
    new (dest) Type(*source);
    }

    /** @brief Helper used to perfect forward move / copy assignment */
    template<typename Type, bool ForceCopy = false>
    void ForwardAssign(Type *dest, Type *source) {
    if constexpr (!ForceCopy && std::is_move_assignable_v<Type>)
    *dest = std::move(*source);
    else
    *dest = *source;
    }

    /** @brief Theorical cacheline size */
    constexpr std::size_t CacheLineSize = 64ul;
    }
    }

    /**
    * @brief The SPSC queue is a lock-free queue that only supports a Single Producer and a Single Consumer
    * The queue is really fast compared to other more flexible implementations because the fact that only two thread can simultaneously read / write
    * means that less synchronization is needed for each operation.
    * The queue supports ranged push / pop to insert multiple elements without performance impact
    *
    * @tparam Type to be inserted
    */
    template<typename Type>
    class kF::Core::SPSCQueue
    {
    public:
    /** @brief Buffer structure containing all cells */
    struct Buffer
    {
    Type *data { nullptr };
    std::size_t capacity { 0 };
    };

    /** @brief Local thread cache */
    struct Cache
    {
    Buffer buffer {};
    std::size_t value { 0 };
    };

    /** @brief Default constructor initialize the queue */
    SPSCQueue(const std::size_t capacity);

    /** @brief Destruct and release all memory (unsafe) */
    ~SPSCQueue(void) { clear(); std::free(_buffer.data); }

    /** @brief Push a single element into the queue
    * @return true if the element has been inserted */
    template<typename ...Args>
    [[nodiscard]] inline bool push(Args &&...args);

    /** @brief Pop a single element from the queue
    * @return true if an element has been extracted */
    [[nodiscard]] inline bool pop(Type &value);

    /** @brief Clear all elements of the queue (unsafe) */
    void clear(void);

    private:
    KF_ALIGN_CACHELINE std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    struct {
    Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
    std::size_t _headCache { 0 }; // Head cache for the producer
    char _pad0[Utils::CacheLineSize - sizeof(Buffer) - sizeof(std::size_t)];
    };

    KF_ALIGN_CACHELINE std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    struct{
    Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer2
    std::size_t _tailCache { 0 }; // Head cache for the consumer
    char _pad1[Utils::CacheLineSize - sizeof(Buffer) - sizeof(std::size_t)];
    };

    /** @brief Copy and move constructors disabled */
    SPSCQueue(const SPSCQueue &other) = delete;
    SPSCQueue(SPSCQueue &&other) = delete;
    };

    static_assert(sizeof(kF::Core::SPSCQueue<int>) == 4 * kF::Core::Utils::CacheLineSize);

    template<typename Type>
    kF::Core::SPSCQueue<Type>::SPSCQueue(const std::size_t capacity)
    {
    _buffer.capacity = capacity;
    if (_buffer.data = reinterpret_cast<Type *>(std::malloc(sizeof(Type) * capacity)); !_buffer.data)
    throw std::runtime_error("Core::SPSCQueue: Malloc failed");
    _buffer2 = _buffer;
    }

    template<typename Type>
    template<typename ...Args>
    bool kF::Core::SPSCQueue<Type>::push(Args &&...args)
    {
    static_assert(std::is_constructible<Type, Args...>::value, "Type must be constructible from Args...");

    const auto tail = _tail.load(std::memory_order_relaxed);
    auto next = tail + 1;

    if (next == _buffer.capacity) [[unlikely]]
    next = 0;
    if (auto head = _headCache; next == head) [[unlikely]] {
    head = _headCache = _head.load(std::memory_order_acquire);
    if (next == head) [[unlikely]]
    return false;
    }
    new (_buffer.data + tail) Type{ std::forward<Args>(args)... };
    _tail.store(next, std::memory_order_release);
    return true;
    }

    template<typename Type>
    bool kF::Core::SPSCQueue<Type>::pop(Type &value)
    {
    const auto head = _head.load(std::memory_order_relaxed);

    if (auto tail = _tailCache; head == tail) [[unlikely]] {
    tail = _tailCache = _tail.load(std::memory_order_acquire);
    if (head == tail) [[unlikely]]
    return false;
    }
    auto *elem = reinterpret_cast<Type *>(_buffer2.data + head);
    auto next = head + 1;
    if (next == _buffer2.capacity) [[unlikely]]
    next = 0;
    value = std::move(*elem);
    elem->~Type();
    _head.store(next, std::memory_order_release);
    return true;
    }

    template<typename Type>
    void kF::Core::SPSCQueue<Type>::clear(void)
    {
    for (Type type; pop(type););
    }
    基准,使用 google benchmark .
    bench_SPSCQueue.cpp:
    #include <thread>

    #include <benchmark/benchmark.h>

    #include "SPSCQueue.hpp"

    using namespace kF;

    using Queue = Core::SPSCQueue<std::size_t>;

    constexpr std::size_t Capacity = 4096;

    static void SPSCQueue_NoisyPush(benchmark::State &state)
    {
    Queue queue(Capacity);
    std::atomic<bool> running = true;
    std::size_t i = 0ul;
    std::thread thd([&queue, &running] { for (std::size_t tmp; running; benchmark::DoNotOptimize(queue.pop(tmp))); });
    for (auto _ : state) {
    decltype(std::chrono::high_resolution_clock::now()) start;
    do {
    start = std::chrono::high_resolution_clock::now();
    } while (!queue.push(42ul));
    auto end = std::chrono::high_resolution_clock::now();
    auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
    auto iterationTime = elapsed.count();
    state.SetIterationTime(iterationTime);
    }
    running = false;
    if (thd.joinable())
    thd.join();
    }
    BENCHMARK(SPSCQueue_NoisyPush)->UseManualTime();

    static void SPSCQueue_NoisyPop(benchmark::State &state)
    {
    Queue queue(Capacity);
    std::atomic<bool> running = true;
    std::size_t i = 0ul;
    std::thread thd([&queue, &running] { while (running) benchmark::DoNotOptimize(queue.push(42ul)); });
    for (auto _ : state) {
    std::size_t tmp;
    decltype(std::chrono::high_resolution_clock::now()) start;
    do {
    start = std::chrono::high_resolution_clock::now();
    } while (!queue.pop(tmp));
    auto end = std::chrono::high_resolution_clock::now();
    auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
    auto iterationTime = elapsed.count();
    state.SetIterationTime(iterationTime);
    }
    running = false;
    if (thd.joinable())
    thd.join();
    }
    BENCHMARK(SPSCQueue_NoisyPop)->UseManualTime();

    最佳答案

    感谢您的有用评论(主要是感谢 Peter Cordes),问题似乎来自 L2 数据预取器。
    由于我的 SPSC 队列设计,每个线程必须访问两个连续的缓存行来推送/弹出队列。
    如果结构本身未与 128 字节对齐,则其地址将不会在 128 字节上对齐,并且编译器将无法优化两个对齐缓存行的访问。
    因此,简单的修复是:

    template<typename Type>
    class alignas(128) SPSCQueue { ... };
    Here (section 2.5.5.4 Data Prefetching)是 Intel 的一篇有趣的论文,解释了对其架构的优化以及如何在不同级别的缓存中完成预取。

    关于c++ - 使用 alignas 防止错误共享被破坏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63706666/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com