gpt4 book ai didi

c++ - 由无锁容器管理的缓冲区的完整性

转载 作者:塔克拉玛干 更新时间:2023-11-03 07:14:30 24 4
gpt4 key购买 nike

(澄清两个误解的答案:如果生产者线程的数量小于堆栈大小,代码运行良好。只有 1 个消费者释放槽。我用 32 个生产者 VS 16 个槽调整这个演示的方式是触发很快就坏了)

在对用于多线程缓冲区管理的无锁栈进行压力测试时,我发现无法保证缓冲区内容的完整性。我现在很确定堆栈/后进先出解决方案不是最佳选择;但我仍然想了解这些缓冲区是如何被破坏的。

这个想法是:一个无锁堆栈,其中包含指向“空闲”缓冲区的指针。它们可以由许多生产者线程中的一个检索。然后缓冲区充满数据并“分派(dispatch)”到单个消费者线程,最终将它们返回到堆栈。

观察结果是:- 两个线程以某种方式获得相同的缓冲区。- 一个线程正在获取一个缓冲区,其内存仍未从刚刚释放它的其他线程中清除。

这是我为了演示目的可以放在一起的最简单的例子:

更新:我为任何想玩它的人制作了一个具有更好调试输出的版本,这里:https://ideone.com/v9VAqU

#include <atomic>
#include <assert.h>
#include <chrono>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

using namespace std;

#define N_SLOTS 16
#define N_THREADS 32

// The data buffers that are shared among threads
class Buffer { public: int data[N_THREADS] = {0}; } buffers[N_SLOTS];

// The lock-free stack under study
class LockFreeStack
{
Buffer* stack[N_SLOTS];
atomic_int free_slots, out_of_slots, retries;
public:
LockFreeStack() : free_slots(0), out_of_slots(0), retries(0) {
for (int i=0; i<N_SLOTS; i++)
release_buffer(&buffers[i]);
}
Buffer* get_buffer()
{
int slot = --free_slots;
if (slot < 0) {
out_of_slots++;
return nullptr;
}
/// [EDIT] CAN GET PREEMPTED RIGHT HERE, BREAKING ATOMICITY!
return stack[slot];
}
void release_buffer(Buffer* buf)
{
int slot;
while(true) {
slot = free_slots;
if (slot <= 0) {
stack[0] = buf;
free_slots = 1;
break;
}
stack[slot] = buf;
if (free_slots++ == slot)
break;
retries++;
}
}
ostream& toStream(ostream& oss) {
return oss << "LockFreeStack with free_slots=" << free_slots << ", oos=" << out_of_slots << ", retries=" << retries;
}
} lockFreeStack;

// Utility class to help with test
class PrintQueue {
queue<Buffer*> q;
mutex m;
public:
void add(Buffer* buf) {
lock_guard<mutex> lock(m);
q.push(buf);
}
Buffer* pop() {
lock_guard<mutex> lock(m);
Buffer* buf;
if (q.empty())
return nullptr;
buf = q.front();
q.pop();
return buf;
}
} printQueue;

int main()
{
vector<thread> workers;
for (int t = 0; t < N_THREADS; ++t) {
workers.push_back(thread([&,t] {
while(true) {
auto buf = lockFreeStack.get_buffer();
if (buf) {
buf->data[t] = t;
this_thread::sleep_for(chrono::milliseconds(10));
printQueue.add(buf);
}
}
}));
}
while(true) {
this_thread::sleep_for(chrono::milliseconds(10));
lockFreeStack.toStream(cout) << endl;
Buffer *buf;
while((buf = printQueue.pop())) {
cout << "Got Buffer " << buf << " #" << (buf-buffers) << " { ";
int used = 0;
for(int t=0; t<N_THREADS; t++)
if (buf->data[t]) {
used += 1;
cout << 't' << buf->data[t] << ' ';
buf->data[t] = 0;
}
cout << "}\n";
assert (used == 1);
lockFreeStack.release_buffer(buf);
}
}
return 0;
}

以及错误输出的示例:

> LockFreeStack with free_slots=-2454858, oos=2454836, retries=0
> Got Buffer 0x604a40 #12 { t7 }
> Got Buffer 0x6049c0 #11 { t8 }
> Got Buffer 0x604b40 #14 { t1 }
> Got Buffer 0x604bc0 #15 { }
> test.cpp:111: int main(): Assertion `used == 1' failed.

我已经尝试在所有地方使用 std::atomic_thread_fence() 但它没有任何区别。

错在哪里?

(顺便说一句,已使用多个版本的 GCC 进行测试,包括 5.2 和 4.6)

最佳答案

您的 LockFreeStack 代码已完全损坏。

从 2 个线程同时调用的

release_buffer 可以将 2 个指针粘在同一个槽中,因此丢失一个。

if (free_slots++ == slot) 将仅对一个线程成功,因此另一个线程将再次尝试并将其指针放入另一个插槽。但它也可能是在第一个插槽中获胜的那个,所以你得到相同的但在 2 个插槽中。

您可以通过 1 个线程调用 release_buffer 和另一个线程调用 get_buffer 获得相同的效果。这些情况中的一种或两种都会导致您的腐败。

release_buffer 不受 stack 大小的限制,因此预计缓冲区会溢出,然后一切都会崩溃。

我建议:

  1. release_buffer 首先选择一个唯一原子,然后写入它。

  2. 当多个释放器竞争插槽时,插槽中指针的写入顺序不是保证,因此您需要一些其他方法来将插槽标记为在 release_buffer 上有效,并在上标记为无效获取缓冲区。最简单的方法是在 get_buffer 中将其设为 null。

  3. 将计数器绑定(bind)到堆栈的大小。如果您不能通过一个原子操作完成它,请复制一份,进行所有更改,然后将其还原。

编辑

下面是将同一个缓冲区返回到 2 个单元格中的场景:

                                  ////T==0  free_slots==5

// thread 1
void release_buffer(Buffer* buf) ////T==1 buf==buffers[7]
{
int slot;
while(true) { //// 1st iteration
slot = free_slots; ////T==2 free_slots==5 slot==5
if (slot <= 0) {
stack[0] = buf;
free_slots = 1;
break;
} ////*** note other threads below ***
stack[slot] = buf; //// stack[5]==buffers[7]
if (free_slots++ == slot) ////T==5 free_slots==4 slot==5 ---> go for another round
break;
retries++;
}
while(true) { //// 2nd iteration
slot = free_slots; ////T==6 free_slots==4 slot==4
if (slot <= 0) {
stack[0] = buf;
free_slots = 1;
break;
}
stack[slot] = buf; //// stack[4]==buffers[7] //// BOOM!!!!
if (free_slots++ == slot) ////T==7 free_slots==5 slot==4 ---> no other round
break;
retries++;
}
}

// thread 2
Buffer* get_buffer() // thread
{
int slot = --free_slots; ////T==3 free_slots==4
if (slot < 0) {
out_of_slots++;
return nullptr;
}
return stack[slot];
}

// thread 3
Buffer* get_buffer()
{
int slot = --free_slots; ////T==4 free_slots==3
if (slot < 0) {
out_of_slots++;
return nullptr;
}
return stack[slot];
}

编辑 2:断言失败...

如果你现在还没有找到它,就在这里:

//// producer t==0
buf->data[t] = t; //// buf->data[t] == 0

//consumer
for(int t=0; t<N_THREADS; t++) // first iteration, t==0
if (buf->data[t]) { //// buf->data[t] == 0, branch not taken
used += 1;
...
//// used remains ==0 -----> assert fails

在缓冲区中写入 t+1 将修复它。

关于c++ - 由无锁容器管理的缓冲区的完整性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35677513/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com