gpt4 book ai didi

c++ - 具有原子索引的环形缓冲区

转载 作者:行者123 更新时间:2023-11-30 04:55:27 24 4
gpt4 key购买 nike

我一直在与对原子在 C++ 中的工作方式的根本误解作斗争。我编写了下面的代码来实现一个使用原子变量作为索引的快速环形缓冲区,以便多个线程可以写入缓冲区和从缓冲区读取。我已将代码缩减为这个简单的案例(我意识到它仍然有点长。抱歉。)。如果我在 Linux 或 Mac OS X 上运行它,它有时会工作,但它也会至少有 10% 的时间抛出异常。它也似乎跑得很快,然后慢下来,甚至可能再次加速,这也表明有些事情不太对劲。我无法理解我逻辑中的缺陷。我在某处需要围栏吗?

下面是它试图做的事情的简单描述:使用 compare_exchange_weak 方法增加原子索引变量。这是为了保证对索引被撞出的槽的独占访问。实际上需要两个索引,因此当我们环绕环形缓冲区时,值不会被覆盖。更多详细信息包含在评论中。

#include <mutex>
#include <atomic>
#include <iostream>
#include <cstdint>
#include <vector>
#include <thread>
using namespace std;


const uint64_t Nevents = 1000000;
std::atomic<uint64_t> Nwritten(0);
std::atomic<uint64_t> Nread(0);
#define MAX_EVENTS 10

mutex MTX;

std::atomic<uint32_t> iread{0}; // The slot that the next thread will try to read from
std::atomic<uint32_t> iwrite{0}; // The slot that the next thread will try to write to
std::atomic<uint32_t> ibegin{0}; // The slot indicating the beginning of the read region
std::atomic<uint32_t> iend{0}; // The slot indicating one-past-the-end of the read region
std::atomic<uint64_t> EVENT_QUEUE[MAX_EVENTS];

//-------------------------------
// WriteThreadATOMIC
//-------------------------------
void WriteThreadATOMIC(void)
{
MTX.lock();
MTX.unlock();

while( Nwritten < Nevents ){

// Copy (atomic) iwrite index to local variable and calculate index
// of next slot after it
uint32_t idx = iwrite;
uint32_t inext = (idx + 1) % MAX_EVENTS;
if(inext == ibegin){
// Queue is full
continue;
}

// At this point it looks like slot "idx" is available to write to.
// The next call ensures only one thread actually does write to it
// since the compare_exchange_weak will succeed for only one.
if(iwrite.compare_exchange_weak(idx, inext))
{
// OK, we've claimed exclusive access to the slot. We've also
// bumped the iwrite index so another writer thread can try
// writing to the next slot. Now we write to the slot.
if(EVENT_QUEUE[idx] != 0) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -1;} // Dummy check. This should NEVER happen!
EVENT_QUEUE[idx] = 1;
Nwritten++;

if(Nread>Nwritten) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -3;} // Dummy check. This should NEVER happen!

// The idx slot now contains valid data so bump the iend index to
// let reader threads know. Note: if multiple writer threads are
// in play, this may spin waiting for another to bump iend to us
// before we can bump it to the next slot.
uint32_t save_idx = idx;
while(!iend.compare_exchange_weak(idx, inext)) idx = save_idx;
}
}
lock_guard<mutex> lck(MTX);
cout << "WriteThreadATOMIC done" << endl;
}

//-------------------------------
// ReadThreadATOMIC
//-------------------------------
void ReadThreadATOMIC(void)
{
MTX.lock();
MTX.unlock();

while( Nread < Nevents ){

uint32_t idx = iread;
if(idx == iend) {
// Queue is empty
continue;
}
uint32_t inext = (idx + 1) % MAX_EVENTS;

// At this point it looks like slot "idx" is available to read from.
// The next call ensures only one thread actually does read from it
// since the compare_exchange_weak will succeed for only one.
if( iread.compare_exchange_weak(idx, inext) )
{
// Similar to above, we now have exclusive access to this slot
// for reading.
if(EVENT_QUEUE[idx] != 1) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -2;} // Dummy check. This should NEVER happen!
EVENT_QUEUE[idx] = 0;
Nread++;

if(Nread>Nwritten) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -4;} // Dummy check. This should NEVER happen!

// Bump ibegin freeing idx up for writing
uint32_t save_idx = idx;
while(!ibegin.compare_exchange_weak(idx, inext)) idx = save_idx;
}
}
lock_guard<mutex> lck(MTX);
cout << "ReadThreadATOMIC done" << endl;
}

//-------------------------------
// main
//-------------------------------
int main(int narg, char *argv[])
{
int Nwrite_threads = 4;
int Nread_threads = 4;

for(int i=0; i<MAX_EVENTS; i++) EVENT_QUEUE[i] = 0;

MTX.lock(); // Hold off threads until all are created

// Launch writer and reader threads
vector<std::thread *> atomic_threads;
for(int i=0; i<Nwrite_threads; i++){
atomic_threads.push_back( new std::thread(WriteThreadATOMIC) );
}
for(int i=0; i<Nread_threads; i++){
atomic_threads.push_back( new std::thread(ReadThreadATOMIC) );
}

// Release all threads and wait for them to finish
MTX.unlock();
while( Nread < Nevents) {
std::this_thread::sleep_for(std::chrono::microseconds(1000000));
cout << "Nwritten: " << Nwritten << " Nread: " << Nread << endl;
}

// Join threads
for(auto t : atomic_threads) t->join();
}

当我在调试器中发现这个问题时,通常是因为 EVENT_QUEUE 槽中的值错误。有时虽然 Nread 计数超过 Nwritten,但这似乎是不可能的。我认为我不需要围栏,因为一切都是原子的,但我现在不能说,因为我必须质疑我认为我知道的一切。

如有任何建议或见解,我们将不胜感激。

最佳答案

我之前构建过这个确切的结构,您的实现与我曾经有过的几乎一样,但也有问题。问题归结为环形缓冲区,因为它们不断重复使用相同的内存,所以特别容易受到 ABA 问题的影响。

如果您不知道,一个 ABA problem是你获取值 A 的地方,你稍后检查该值是否仍然是 A 以确保你仍然处于良好状态,但你不知道,该值实际上从 A 更改为 B,然后又变回 A

我会在你的作者中指出一个场景,但读者有同样的问题:

// Here you check if you can even do the write, lets say it succeeds.
uint32_t idx = iwrite;
uint32_t inext = (idx + 1) % MAX_EVENTS;
if(inext == ibegin)
continue;

// Here you do a compare exchange to ensure that nothing has changed
// out from under you, but lets say your thread gets unscheduled, giving
// time for plenty of other reads and writes occur, enough writes that
// your buffer wraps around such that iwrite is back to where it was at.
// The compare exchange can succeed, but your condition above may not
// still be good anymore!
if(iwrite.compare_exchange_weak(idx, inext))
{
...

不知道有没有更好的办法解决这个问题,但是我觉得在exchange之后加一个check还是有问题的。我最终通过添加额外的原子来解决这个问题,这些原子可以跟踪写入保留和读取保留计数,这样即使它环绕,我也可以保证空间仍然可以继续工作。可能还有其他解决方案。

免责声明:这可能不是您唯一的问题。

关于c++ - 具有原子索引的环形缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53038437/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com