gpt4 book ai didi

c++ - 在并发队列中覆盖

转载 作者:太空宇宙 更新时间:2023-11-04 11:32:39 24 4
gpt4 key购买 nike

我正在尝试编写一个无互斥(但不是无锁)队列,它使用连续的内存范围作为循环缓冲区和四个指针:两个用于消费者,两个用于生产者。它在最新推送的元素之后保留一个空格,以消除满队列和空队列之间的歧义。这是实现:

template <typename T, typename Allocator = std::allocator<T>>
class concurrent_queue
{
protected:
T *storage;
std::size_t s;
std::atomic<T*> consumer_head, producer_head;

union alignas(16) dpointer
{
struct
{
T *ptr;
std::size_t cnt;
};
__int128 val;
};

dpointer consumer_pending, producer_pending;

Allocator alloc;

public:
concurrent_queue(std::size_t s): storage(nullptr), consumer_head(nullptr), producer_head(nullptr)
{
storage = alloc.allocate(s+1);

consumer_head = storage;
__atomic_store_n(&(consumer_pending.val), (dpointer{storage, 0}).val, __ATOMIC_SEQ_CST);

producer_head = storage;
__atomic_store_n(&(producer_pending.val), (dpointer{storage, 0}).val, __ATOMIC_SEQ_CST);

this->s = s + 1;
}
~concurrent_queue()
{
while(consumer_head != producer_head)
{
alloc.destroy(consumer_head.load());
++consumer_head;
if(consumer_head == storage + s)
consumer_head = storage;
}
alloc.deallocate(storage, s);
}

template <typename U>
bool push(U&& e)
{
while(true)
{
dpointer a;
a.val = __atomic_load_n(&(producer_pending.val), __ATOMIC_RELAXED);
std::atomic_thread_fence(std::memory_order_acquire);
auto b = consumer_head.load(std::memory_order_relaxed);

auto next = a.ptr + 1;
if(next == storage + s) next = storage;

if(next == b) continue;
dpointer newval{next, a.cnt+1};
if(!__atomic_compare_exchange_n(&(producer_pending.val), &(a.val), (newval.val), true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) continue;

alloc.construct(a.ptr, std::forward<U>(e));

while(!producer_head.compare_exchange_weak(a.ptr, next, std::memory_order_release, std::memory_order_relaxed));
return true;
}
}

template <typename U>
bool pop(U& result)
{
while(true)
{
dpointer a;
a.val = __atomic_load_n(&(consumer_pending.val), __ATOMIC_RELAXED);
std::atomic_thread_fence(std::memory_order_acquire);
auto b = producer_head.load(std::memory_order_relaxed);

auto next = a.ptr + 1;
if(next == storage + s) next = storage;

if(a.ptr == b) continue;
dpointer newval{next, a.cnt+1};
if(!__atomic_compare_exchange_n(&(consumer_pending.val), &(a.val), (newval.val), true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) continue;

result = std::move(*(a.ptr));
alloc.destroy(a.ptr);

while(!consumer_head.compare_exchange_weak(a.ptr, next, std::memory_order_release, std::memory_order_relaxed));
return true;
}
}
};

然而,当使用相同数量的独立推送和弹出线程进行测试时,每个推送/弹出相等的、预定数量的元素在终止之前,一些弹出线程有时(不总是)卡在第一个 CAS 处执行中的某个点并且永远不会终止,即使在所有推送线程终止之后也是如此。由于它们尝试弹出与推送线程推送的元素数量相同的元素,因此我怀疑在某个时候推送线程中发生了覆盖。

这是我第一次尝试编写并发容器,所以我对此非常缺乏经验......我已经盯着它看了一段时间,但一直没能弄清楚哪里出了问题。对此更有经验的人可以发现问题吗?

此外,是否有任何平台特定的方式来获得双倍宽度的 CAS?

最佳答案

编辑: 大多数内容是这篇文章实际上是错误的。查看评论。

        dpointer a;
a.val = __atomic_load_n(&(producer_pending.val), __ATOMIC_RELAXED);
std::atomic_thread_fence(std::memory_order_acquire);
auto b = consumer_head.load(std::memory_order_relaxed);

您绝对确定这会按照您的想法行事吗?此代码段将 a.val 排在 b 之前。

std::atomic_thread_fence(std::memory_order_acquire);保证在栅栏之后 的内存读取操作不会在栅栏之前 重新排序。但是没有什么能阻止栅栏上方的内存操作流到栅栏下方。编译器可以完全自由地将获取栅栏向上移动到它想要的位置,只要它不与其他栅栏重新排序即可。

更抽象:

a = load relaxed
memory fence acquire -- memory operations below this line may not float upwards
b = load relaxed

这个编译器可能会把它转换成这样:

memory fence acquire
b = load relaxed
a = load relaxed

但不是这个:

a = load relaxed
b = load relaxed
memory fence acquire

此外,您应该真正避免内存栅栏,并在操作本身上添加获取/释放。这通常会为非 x86 目标生成更好的代码。对于 x86,这并不重要,因为在各种情况下,即使是普通的 mov 也足以提供顺序一致性。

关于c++ - 在并发队列中覆盖,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24087577/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com