gpt4 book ai didi

c++ - 尝试实现无锁队列时发生堆栈溢出

转载 作者:塔克拉玛干 更新时间:2023-11-03 02:08:15 24 4
gpt4 key购买 nike


我根据 Maged M. Michael 和 Michael L. Scott 工作中指定的算法实现了一个无锁队列 Simple, Fast, and Practical Non-Blocking and BlockingConcurrent Queue Algorithms (算法请跳转到第4页)

我在 shared_ptr 上使用了原子操作,例如 std::atomic_load_explicit 等。

当只在一个线程中使用队列时,一切都很好,但是当从不同线程中使用它时,我得到一个堆栈溢出异常。

不幸的是,我无法追查问题的根源。似乎当一个 shared_ptr 超出范围时,它会减少下一个 ConcurrentQueueNode 的引用数量,并导致无限递归,但我不明白为什么..

代码:

队列节点:

template<class T>
struct ConcurrentQueueNode {
T m_Data;
std::shared_ptr<ConcurrentQueueNode> m_Next;

template<class ... Args>
ConcurrentQueueNode(Args&& ... args) :
m_Data(std::forward<Args>(args)...) {}

std::shared_ptr<ConcurrentQueueNode>& getNext() {
return m_Next;
}

T getValue() {
return std::move(m_Data);
}

};

并发队列(注意:不适合胆小的人):

template<class T>
class ConcurrentQueue {
std::shared_ptr<ConcurrentQueueNode<T>> m_Head, m_Tail;

public:

ConcurrentQueue(){
m_Head = m_Tail = std::make_shared<ConcurrentQueueNode<T>>();
}

template<class ... Args>
void push(Args&& ... args) {
auto node = std::make_shared<ConcurrentQueueNode<T>>(std::forward<Args>(args)...);
std::shared_ptr<ConcurrentQueueNode<T>> tail;

for (;;) {
tail = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire);
std::shared_ptr<ConcurrentQueueNode<T>> next =
std::atomic_load_explicit(&tail->getNext(),std::memory_order_acquire);

if (tail == std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)) {
if (next.get() == nullptr) {
auto currentNext = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)->getNext();
auto res = std::atomic_compare_exchange_weak(&tail->getNext(), &next, node);
if (res) {
break;
}
}
else {
std::atomic_compare_exchange_weak(&m_Tail, &tail, next);
}
}
}

std::atomic_compare_exchange_strong(&m_Tail, &tail, node);
}

bool tryPop(T& dest) {
std::shared_ptr<ConcurrentQueueNode<T>> head;
for (;;) {
head = std::atomic_load_explicit(&m_Head, std::memory_order_acquire);
auto tail = std::atomic_load_explicit(&m_Tail,std::memory_order_acquire);
auto next = std::atomic_load_explicit(&head->getNext(), std::memory_order_acquire);

if (head == std::atomic_load_explicit(&m_Head, std::memory_order_acquire)) {
if (head.get() == tail.get()) {
if (next.get() == nullptr) {
return false;
}
std::atomic_compare_exchange_weak(&m_Tail, &tail, next);
}
else {
dest = next->getValue();
auto res = std::atomic_compare_exchange_weak(&m_Head, &head, next);
if (res) {
break;
}
}
}
}

return true;
}
};

重现问题的示例用法:

int main(){
ConcurrentQueue<int> queue;
std::thread threads[4];

for (auto& thread : threads) {
thread = std::thread([&queue] {

for (auto i = 0; i < 100'000; i++) {
queue.push(i);
int y;
queue.tryPop(y);
}
});
}

for (auto& thread : threads) {
thread.join();
}
return 0;
}

最佳答案

问题是竞争条件会导致队列中的每个节点都等待一次全部释放 - 这是递归的并且会破坏您的堆栈。

如果您将测试更改为仅使用一个线程但不弹出,您每次都会遇到相同的堆栈溢出错误。

for (auto i = 1; i < 100000; i++) {
queue.push(i);
//int y;
//queue.tryPop(y);
}

您需要非递归地删除节点链:

__forceinline ~ConcurrentQueueNode() {
if (!m_Next || m_Next.use_count() > 1)
return;
KillChainOfDeath();
}
void KillChainOfDeath() {
auto pThis = this;
std::shared_ptr<ConcurrentQueueNode> Next, Prev;
while (1) {
if (pThis->m_Next.use_count() > 1)
break;
Next.swap(pThis->m_Next); // unwire node
Prev = NULL; // free previous node that we unwired in previous loop
if (!(pThis = Next.get())) // move to next node
break;
Prev.swap(Next); // else Next.swap will free before unwire.
}
}

我以前从未使用过shared_ptr,所以我不知道是否有更快的方法来做到这一点。另外,由于我以前从未使用过 shared_ptr,所以我不知道你的算法是否会遇到 ABA 问题。除非在 shared_ptr 实现中有一些特殊的东西来防止 ABA,否则我担心以前释放的节点可以被重用,从而欺骗 CAS。不过,当我运行您的代码时,我似乎从来没有遇到过这个问题。

关于c++ - 尝试实现无锁队列时发生堆栈溢出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38097572/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com