gpt4 book ai didi

c++ - 避免 SPSC 队列索引的错误共享

转载 作者:行者123 更新时间:2023-12-01 14:48:00 27 4
gpt4 key购买 nike

让我们想象一个无锁并发 SPSC(单生产者/单消费者)队列。

  • 生产者线程 读取 head , tail , cached_tail head , cached_tail .
  • 消费者线程 读取 head , tail , cached_head tail , cached head .

  • 请注意, cached_tail只能由生产者线程访问,就像 cached_head仅由消费者线程访问。它们可以被认为是私有(private)线程局部变量,因此它们是不同步的,因此没有定义为原子的。

    队列的数据布局如下:
    #include <atomic>
    #include <cstddef>
    #include <thread>

    struct spsc_queue
    {
    /// ...

    // Producer variables
    alignas(std::hardware_destructive_interference_size) std::atomic<size_t> head; // shared
    size_t cached_tail; // non-shared

    // Consumer variables
    alignas(std::hardware_destructive_interference_size) std::atomic<size_t> tail; // shared
    size_t cached_head; // non-shared

    std::byte padding[std::hardware_destructive_interference_size - sizeof(tail) - sizeof(cached_head)];
    };

    因为我想避免 虚假分享 , 我对齐了 headtail到 L1 缓存行大小。
    push 的伪代码实现/ pop操作如下:
    bool push(const void* elems, size_t n)
    {
    size_t h = atomic_load(head, relaxed);

    if (num_remaining_storage(h, cached_tail) < n)
    {
    cached_tail = atomic_load(tail, acquire);

    if (num_remaining_storage(h, cached_tail) < n)
    return false;
    }

    // write from elems

    atomic_store(head, h + n, release);
    return true;
    }

    bool pop(void* elems, size_t n)
    {
    size_t t = atomic_load(tail, relaxed);

    if (num_stored_elements(cached_head, t) < n)
    {
    cached_head = atomic_load(head, acquire);

    if (num_stored_elements(cached_head, t) < n)
    return false;
    }

    // read to elems

    atomic_store(tail, t + n, release);
    return true;
    }

    void wait_and_push(const void* elems, size_t n)
    {
    size_t h = atomic_load(head, relaxed);

    while (num_remaining_storage(h, cached_tail) < n)
    cached_tail = atomic_load(tail, acquire);

    // write from elems

    atomic_store(head, h + n, release);
    }

    void wait_and_pop(void* elems, size_t n)
    {
    size_t t = atomic_load(tail, relaxed);

    while (num_stored_elements(cached_head, t) < n)
    cached_head = atomic_load(head, acquire);

    // write to elems

    atomic_store(tail, t + n, release);
    }

    在初始化时(此处未列出),所有索引都设置为 0 .
    功能 num_remaining_storagenum_stored_elementsconst基于传递的参数和不可变队列容量执行简单计算的函数 - 它们不执行任何原子读取或写入。

    现在的问题是 : 我需要对齐 cached_tailcached_head以及完全避免错误共享任何索引,或者它是可以的。由于 cached_tail是生产者私有(private)的, cached_head是消费者私有(private)的,我认为 cached_tail可以与 head 位于同一缓存行中(生产者缓存行),就像 cached_head在与 tail 相同的高速缓存行中(消费者缓存行)不会发生虚假共享。

    我错过了什么吗?

    最佳答案

    感谢您提供伪代码 - 它仍然缺少一些细节,但我想我明白了基本的想法。你有一个有界的 SPSC 队列,索引可以环绕,你使用 cached_tail push 中的变量检查是否有空闲插槽,这样可以避免加载 tail从可能无效的高速缓存行(pop 反之亦然)。

    我建议把 headcached_tail彼此相邻(即在同一高速缓存行上)和 tailcached_head在另一个上。 push总是读取两个变量 - headcached_tail ,因此将它们靠近在一起是有意义的。 cached_tail只有在没有更多空闲插槽并且我们必须重新加载 tail 时才会更新.

    您的代码在细节上有点薄,但似乎还有一些优化空间:

    bool push(const void* elems, size_t n)
    {
    size_t h = atomic_load(head);

    if (num_remaining_storage(h, cached_tail) < n)
    {
    auto t = atomic_load(tail);
    if (t == cached_tail)
    return false;

    // we only have to update our cached_tail if the reloaded value
    // is different - and in this case it is guaranteed that there
    // is a free slot, so we don't have to perform a recheck.
    cached_tail = t;
    }

    // write from elems

    atomic_store(head, h + n);
    return true;
    }

    那样 cached_tail仅在 head 时更新也会更新,因此这是它们位于同一缓存行上的另一个原因。当然,同样的优化可以应用于 pop。也是。

    这正是我想看一些代码的原因,因为访问模式对于确定哪些变量应该共享一个缓存行和哪些不应该是至关重要的。

    关于c++ - 避免 SPSC 队列索引的错误共享,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61507688/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com