- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
(澄清两个误解的答案:如果生产者线程的数量小于堆栈大小,代码运行良好。只有 1 个消费者释放槽。我用 32 个生产者 VS 16 个槽调整这个演示的方式是触发很快就坏了)
在对用于多线程缓冲区管理的无锁栈进行压力测试时,我发现无法保证缓冲区内容的完整性。我现在很确定堆栈/后进先出解决方案不是最佳选择;但我仍然想了解这些缓冲区是如何被破坏的。
这个想法是:一个无锁堆栈,其中包含指向“空闲”缓冲区的指针。它们可以由许多生产者线程中的一个检索。然后缓冲区充满数据并“分派(dispatch)”到单个消费者线程,最终将它们返回到堆栈。
观察结果是:- 两个线程以某种方式获得相同的缓冲区。- 一个线程正在获取一个缓冲区,其内存仍未从刚刚释放它的其他线程中清除。
这是我为了演示目的可以放在一起的最简单的例子:
更新:我为任何想玩它的人制作了一个具有更好调试输出的版本,这里:https://ideone.com/v9VAqU
#include <atomic>
#include <assert.h>
#include <chrono>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
using namespace std;
#define N_SLOTS 16
#define N_THREADS 32
// The data buffers that are shared among threads
class Buffer { public: int data[N_THREADS] = {0}; } buffers[N_SLOTS];
// The lock-free stack under study
class LockFreeStack
{
Buffer* stack[N_SLOTS];
atomic_int free_slots, out_of_slots, retries;
public:
LockFreeStack() : free_slots(0), out_of_slots(0), retries(0) {
for (int i=0; i<N_SLOTS; i++)
release_buffer(&buffers[i]);
}
Buffer* get_buffer()
{
int slot = --free_slots;
if (slot < 0) {
out_of_slots++;
return nullptr;
}
/// [EDIT] CAN GET PREEMPTED RIGHT HERE, BREAKING ATOMICITY!
return stack[slot];
}
void release_buffer(Buffer* buf)
{
int slot;
while(true) {
slot = free_slots;
if (slot <= 0) {
stack[0] = buf;
free_slots = 1;
break;
}
stack[slot] = buf;
if (free_slots++ == slot)
break;
retries++;
}
}
ostream& toStream(ostream& oss) {
return oss << "LockFreeStack with free_slots=" << free_slots << ", oos=" << out_of_slots << ", retries=" << retries;
}
} lockFreeStack;
// Utility class to help with test
class PrintQueue {
queue<Buffer*> q;
mutex m;
public:
void add(Buffer* buf) {
lock_guard<mutex> lock(m);
q.push(buf);
}
Buffer* pop() {
lock_guard<mutex> lock(m);
Buffer* buf;
if (q.empty())
return nullptr;
buf = q.front();
q.pop();
return buf;
}
} printQueue;
int main()
{
vector<thread> workers;
for (int t = 0; t < N_THREADS; ++t) {
workers.push_back(thread([&,t] {
while(true) {
auto buf = lockFreeStack.get_buffer();
if (buf) {
buf->data[t] = t;
this_thread::sleep_for(chrono::milliseconds(10));
printQueue.add(buf);
}
}
}));
}
while(true) {
this_thread::sleep_for(chrono::milliseconds(10));
lockFreeStack.toStream(cout) << endl;
Buffer *buf;
while((buf = printQueue.pop())) {
cout << "Got Buffer " << buf << " #" << (buf-buffers) << " { ";
int used = 0;
for(int t=0; t<N_THREADS; t++)
if (buf->data[t]) {
used += 1;
cout << 't' << buf->data[t] << ' ';
buf->data[t] = 0;
}
cout << "}\n";
assert (used == 1);
lockFreeStack.release_buffer(buf);
}
}
return 0;
}
以及错误输出的示例:
> LockFreeStack with free_slots=-2454858, oos=2454836, retries=0
> Got Buffer 0x604a40 #12 { t7 }
> Got Buffer 0x6049c0 #11 { t8 }
> Got Buffer 0x604b40 #14 { t1 }
> Got Buffer 0x604bc0 #15 { }
> test.cpp:111: int main(): Assertion `used == 1' failed.
我已经尝试在所有地方使用 std::atomic_thread_fence()
但它没有任何区别。
错在哪里?
(顺便说一句,已使用多个版本的 GCC 进行测试,包括 5.2 和 4.6)
最佳答案
您的 LockFreeStack 代码已完全损坏。
从 2 个线程同时调用的release_buffer
可以将 2 个指针粘在同一个槽中,因此丢失一个。
if (free_slots++ == slot)
将仅对一个线程成功,因此另一个线程将再次尝试并将其指针放入另一个插槽。但它也可能是在第一个插槽中获胜的那个,所以你得到相同的但在 2 个插槽中。
您可以通过 1 个线程调用 release_buffer
和另一个线程调用 get_buffer
获得相同的效果。这些情况中的一种或两种都会导致您的腐败。
release_buffer
不受 stack
大小的限制,因此预计缓冲区会溢出,然后一切都会崩溃。
我建议:
release_buffer
首先选择一个唯一槽原子,然后写入它。
当多个释放器竞争插槽时,插槽中指针的写入顺序不是保证,因此您需要一些其他方法来将插槽标记为在 release_buffer
上有效,并在上标记为无效获取缓冲区
。最简单的方法是在 get_buffer
中将其设为 null。
将计数器绑定(bind)到堆栈的大小。如果您不能通过一个原子操作完成它,请复制一份,进行所有更改,然后将其还原。
编辑
下面是将同一个缓冲区返回到 2 个单元格中的场景:
////T==0 free_slots==5
// thread 1
void release_buffer(Buffer* buf) ////T==1 buf==buffers[7]
{
int slot;
while(true) { //// 1st iteration
slot = free_slots; ////T==2 free_slots==5 slot==5
if (slot <= 0) {
stack[0] = buf;
free_slots = 1;
break;
} ////*** note other threads below ***
stack[slot] = buf; //// stack[5]==buffers[7]
if (free_slots++ == slot) ////T==5 free_slots==4 slot==5 ---> go for another round
break;
retries++;
}
while(true) { //// 2nd iteration
slot = free_slots; ////T==6 free_slots==4 slot==4
if (slot <= 0) {
stack[0] = buf;
free_slots = 1;
break;
}
stack[slot] = buf; //// stack[4]==buffers[7] //// BOOM!!!!
if (free_slots++ == slot) ////T==7 free_slots==5 slot==4 ---> no other round
break;
retries++;
}
}
// thread 2
Buffer* get_buffer() // thread
{
int slot = --free_slots; ////T==3 free_slots==4
if (slot < 0) {
out_of_slots++;
return nullptr;
}
return stack[slot];
}
// thread 3
Buffer* get_buffer()
{
int slot = --free_slots; ////T==4 free_slots==3
if (slot < 0) {
out_of_slots++;
return nullptr;
}
return stack[slot];
}
编辑 2:断言失败...
如果你现在还没有找到它,就在这里:
//// producer t==0
buf->data[t] = t; //// buf->data[t] == 0
//consumer
for(int t=0; t<N_THREADS; t++) // first iteration, t==0
if (buf->data[t]) { //// buf->data[t] == 0, branch not taken
used += 1;
...
//// used remains ==0 -----> assert fails
在缓冲区中写入 t+1 将修复它。
关于c++ - 由无锁容器管理的缓冲区的完整性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35677513/
这个问题在这里已经有了答案: Possible to make an event handler wait until async / Promise-based code is done? (2
我经常有多个运行的进程(R,Python,eshell/shell),对于每个进程,我经常都有一个相关的脚本,可以从中发送摘要。为此,我通常将每个框架垂直地分成两个窗口,以便脚本文件(例如.py)位于
如何修改 emacs 在关闭缓冲区后选择要显示的缓冲区的方式? 当我有多个列显示相同的缓冲区,然后在其中一个缓冲区中打开另一个文件,然后关闭新打开的缓冲区时,它不会切换回前一个缓冲区,而是切换到另一个
如何将 ex 命令复制到剪贴板或粘贴到缓冲区? 在 Windows 上使用 gvim。 最佳答案 windows剪贴板可以通过the buffer + 访问.因此,可以使用 + 将剪贴板粘贴为前命令。
在 javascript 中如何以比以下更简单的方式获取 b 缓冲区? var num=6553599 var a = new Buffer(4); a.writeInt32LE(num)
每次我在 Google 上搜索有关 OpenGL 编程的文章时,我都会找到一些文章,但似乎所有文章都提到了着色器和缓冲区。那些是什么?你能解释其中的一些吗: 深度缓冲区 模板缓冲区 像素着色器 帧缓冲
我有java考试,当我学习时,我看到了这个练习,我尝试解决它,但我发现一些困难,所以请帮助我考虑实用程序中方法的以下注释、 header 和部分代码名为 Atbash 的加密类。 /**
每次我在 Google 上搜索有关 OpenGL 编程的文章时,我都会找到一些文章,但似乎所有文章都提到了着色器和缓冲区。那些是什么?你能解释其中的一些吗: 深度缓冲区 模板缓冲区 像素着色器 帧缓冲
对于每个属性使用跨步顶点缓冲区与紧密打包缓冲区有何优缺点?我的意思是例如: 步幅:xyzrgb xyzrgb xyzrgb 紧:xyzxyzxyz rgbrgbrgb 乍一看,使用步幅时您似乎可以轻松
我正在尝试将文本文件中每行的数字读取到 ArrayList 中。当我执行以下函数时,它总是跳过最后一个元素。有人可以帮我吗?因为我在这里没有遇到问题,因为它读取直到缓冲区为空,所以他应该在到达 Fil
#include #include int main () { time_t time_raw_format; struct tm * ptr_time; char *buff
基本上我有一个包含不同类型数据的自定义结构。例如: typedef struct example_structure{ uint8_t* example_1[4]; int example_2[4];
我之前的列表实现是一个简单的 LinearLayout,位于一个装满我的项目的 ScrollView 中。 我切换到 ListView 的 Android 实现以简单地使用 CursorAdapter
我想创建一个可变长度的输入事件窗口/缓冲区,当它接收到额外的事件时会变长。 这是为了实现“键入时搜索”功能。我想捕获点击,但为了不给服务器造成压力,我想明智地进行服务调用。 我想到的逻辑是缓冲击键,从
我想将 yuv420P 像素写入缓冲区而不是二进制文件。假设我在指针中存储了 luma 、 Cb 和 Cr。 luma = output_pixel.luma; cb = output_pixel.c
我想在 Go 中构建一个支持多个并发读取器和一个写入器的缓冲区。所有写入缓冲区的内容都应由所有读者读取。允许新读者随时加入,这意味着已经写入的数据必须能够为迟到的读者回放。 缓冲区应满足以下接口(in
本文转载自微信公众号「小明菜市场」,作者小明菜市场。转载本文请联系小明菜市场公众号。 前言 Java NIO 需要理解的主要有缓冲区,通道,选择器,这三个主要的部分。 基础
一 点睛 NIO,可以称为 New IO 或 Non Blocking IO,是在 JDK 1.4 后提供的新 API。传统的I/O 是阻塞式的 I/O、面向流的操作;而 NIO 是非阻塞 I/O 、
我正在寻找一种切换到包含搜索文本的缓冲区的方法。 例如。如果我打开了 100 个缓冲区,我想切换到一个包含 'fooBar = 1' 的缓冲区 最佳答案 我写了一个 Vim 插件来做到这一点:buff
我正在尝试将提取的视频帧(我使用 ffmpeg)推送到 FFMPEG 缓冲区中。我已经查看了 ffmpeg 的缓冲区源文件,例如 buffersrc.c 和 fifo.c,以确定我是否可以这样做,但我
我是一名优秀的程序员,十分优秀!