- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我在 $work 有一个应用程序,我必须在两个以不同频率调度的实时线程之间移动。 (实际调度超出了我的控制。)应用程序是硬实时的(其中一个线程必须驱动硬件接口(interface)),因此线程之间的数据传输应该是无锁和无等待的尽可能。
需要注意的是,只需要传输一个数据块:因为两个线程的运行速率不同,所以在较慢线程的两次唤醒之间,会出现较快线程的两次迭代完成的情况;在这种情况下,可以覆盖写入缓冲区中的数据,以便较慢的线程仅获取最新数据。
换句话说,代替队列,双缓冲解决方案就足够了。这两个缓冲区是在初始化期间分配的,读写线程可以调用类的方法来获取指向这些缓冲区之一的指针。
C++代码:
#include <mutex>
template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() {
m_write_busy = false;
m_read_idx = m_write_idx = 0;
}
~ProducerConsumerDoubleBuffer() { }
// The writer thread using this class must call
// start_writing() at the start of its iteration
// before doing anything else to get the pointer
// to the current write buffer.
T * start_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);
m_write_busy = true;
m_write_idx = 1 - m_read_idx;
return &m_buf[m_write_idx];
}
// The writer thread must call end_writing()
// as the last thing it does
// to release the write busy flag.
void end_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);
m_write_busy = false;
}
// The reader thread must call start_reading()
// at the start of its iteration to get the pointer
// to the current read buffer.
// If the write thread is not active at this time,
// the read buffer pointer will be set to the
// (previous) write buffer - so the reader gets the latest data.
// If the write buffer is busy, the read pointer is not changed.
// In this case the read buffer may contain stale data,
// it is up to the user to deal with this case.
T * start_reading(void) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_write_busy) {
m_read_idx = m_write_idx;
}
return &m_buf[m_read_idx];
}
// The reader thread must call end_reading()
// at the end of its iteration.
void end_reading(void) {
std::lock_guard<std::mutex> lock(m_mutex);
m_read_idx = m_write_idx;
}
private:
T m_buf[2];
bool m_write_busy;
unsigned int m_read_idx, m_write_idx;
std::mutex m_mutex;
};
最佳答案
很有趣的问题!比我最初想象的要棘手:-)
我喜欢无锁解决方案,所以我尝试在下面解决一个。
有很多方法可以考虑这个系统。你可以建模
它作为一个固定大小的循环缓冲区/队列(有两个条目),但随后
您无法更新下一个可用的消费值,
因为你不知道消费者是否已经开始阅读最近发布的
值或仍在(可能)读取前一个。所以额外的状态
需要超出标准环形缓冲区的范围,以达到更优化的
解决方案。
首先请注意,生产者始终可以安全地写入一个单元格
在任何给定的时间点;如果消费者正在读取一个单元格,则
其他可以写入。让我们调用可以安全写入的单元格
“事件”单元格(可以从中读取的单元格是任何不是的单元格
活跃的)。事件单元只有在其他单元没有时才能切换
当前正在读取。
与始终可以写入的事件单元格不同,非事件单元格可以
仅当它包含一个值时才被读取;一旦该值(value)被消耗,它就消失了。
(这意味着在积极的生产者的情况下避免活锁;在某些
点,消费者将清空单元格并停止接触单元格。一次
发生这种情况时,生产者肯定可以发布一个值,而在此之前,
如果消费者不在,它只能发布一个值(更改事件单元格)
阅读中间。)
如果有一个准备好被消费的值,只有消费者可以改变它
事实(对于非事件单元格,无论如何);后续制作可能会更改哪个单元格
是事件的和发布的值,但一个值将始终准备好被读取,直到
它被消耗了。
一旦生产者完成对事件单元格的写入,它可以通过以下方式“发布”这个值
更改哪个单元格是事件单元格(交换索引),前提是消费者是
不是在读取另一个单元格的中间。如果消费者在中间
读取另一个单元格,交换不会发生,但在这种情况下消费者可以交换
读取完值后,前提是生产者不在中间
写(如果是,生产者将在完成后交换)。
事实上,通常消费者在完成阅读后总是可以交换的(如果它是唯一的
访问系统),因为消费者的虚假交换是良性的:如果有
其他单元格中的某些内容,然后交换将导致接下来读取该内容,如果
没有,交换没有任何影响。
所以,我们需要一个共享变量来跟踪事件单元格是什么,我们还需要一个
生产者和消费者表明他们是否在中间的方式
手术。我们可以将这三块状态按顺序存储到一个原子变量中
能够同时(原子地)影响它们。
我们还需要一种方法让消费者检查里面是否有任何东西
首先是非事件单元格,并且两个线程都可以修改该状态
作为适当的。我尝试了其他一些方法,但最后最简单的就是
将此信息也包含在另一个原子变量中。这让事情变得很多
推理更简单,因为系统中的所有状态变化都是原子的。
我想出了一个无等待的实现(无锁,并且所有操作都已完成
在有限数量的指令中)。
代码时间!
#include <atomic>
#include <cstdint>
template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() : m_state(0) { }
~ProducerConsumerDoubleBuffer() { }
// Never returns nullptr
T* start_writing() {
// Increment active users; once we do this, no one
// can swap the active cell on us until we're done
auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
return &m_buf[state & 1];
}
void end_writing() {
// We want to swap the active cell, but only if we were the last
// ones concurrently accessing the data (otherwise the consumer
// will do it for us when *it's* done accessing the data)
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
if ((state & 0x6) == 0) {
// The consumer wasn't in the middle of a read, we should
// swap (unless the consumer has since started a read or
// already swapped or read a value and is about to swap).
// If we swap, we also want to clear the full flag on what
// will become the active cell, otherwise the consumer could
// eventually read two values out of order (it reads a new
// value, then swaps and reads the old value while the
// producer is idle).
m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
}
}
// Returns nullptr if there appears to be no more data to read yet
T* start_reading() {
m_readState = m_state.load(std::memory_order_relaxed);
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// Nothing to read here!
return nullptr;
}
// At this point, there is guaranteed to be something to
// read, because the full flag is never turned off by the
// producer thread once it's on; the only thing that could
// happen is that the active cell changes, but that can
// only happen after the producer wrote a value into it,
// in which case there's still a value to read, just in a
// different cell.
m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;
// Now that we've incremented the user count, nobody can swap until
// we decrement it
return &m_buf[(m_readState & 1) ^ 1];
}
void end_reading() {
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// There was nothing to read; shame to repeat this
// check, but if these functions are inlined it might
// not matter. Otherwise the API could be changed.
// Or just don't call this method if start_reading()
// returns nullptr -- then you could also get rid
// of m_readState.
return;
}
// Alright, at this point the active cell cannot change on
// us, but the active cell's flag could change and the user
// count could change. We want to release our user count
// and remove the flag on the value we read.
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
// Oi, we were the last ones accessing the data when we released our cell.
// That means we should swap, but only if the producer isn't in the middle
// of producing something, and hasn't already swapped, and hasn't already
// set the flag we just reset (which would mean they swapped an even number
// of times). Note that we don't bother swapping if there's nothing to read
// in the other cell.
m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
}
}
private:
T m_buf[2];
// The bottom (lowest) bit will be the active cell (the one for writing).
// The active cell can only be switched if there's at most one concurrent
// user. The next two bits of state will be the number of concurrent users.
// The fourth bit indicates if there's a value available for reading
// in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
std::atomic<std::uint32_t> m_state;
std::uint32_t m_readState;
};
ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_writing();
if (item != nullptr) { // Always true
*item = i;
}
buf.end_writing();
}
});
std::thread consumer([&]() {
int prev = -1;
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_reading();
if (item != nullptr) {
assert(*item > prev);
prev = *item;
}
buf.end_reading();
}
});
producer.join();
consumer.join();
关于c++ - 单生产者,单消费者数据结构,C++中的双缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23666069/
单向链表 单向链表比顺序结构的线性表最大的好处就是不用保证存放的位置,它只需要用指针去指向下一个元素就能搞定。 单链表图解 图画的比较粗糙,简单的讲解一下: 上面四个长方形,每个长方
使用TCP,我正在设计一些类似于next的程序。 客户端在许多线程中的接收正在等待一台服务器的发送消息。但是,这是有条件的。 recv正在等待特定的发送消息。 例如 客户 thread 1: recv
我正在编写正则表达式来验证电子邮件。唯一让我困惑的是: 顶级域名可以使用单个字符吗?(例如:lockevn.c) 背景:我知道顶级域名可以是 2 个字符到任意字符(.uk、.us 到 .canon、.
是否可以在单个定义中定义同一 Controller 的多个路由? 例如: 我想要一个单一的定义 /, /about, /privacy-policy 使用类似的东西 _home: pat
我正在使用 objective-c开发针对 11.4 iOS 的单 View 应用程序,以及 Xcode版本是 9.4.1。 创建后有Main.storyboard和LaunchScreen.stor
我一直在尝试在 shell 程序中实现管道结构,如果我执行简单的命令(例如“hello | rev”),它就可以工作 但是当我尝试执行“head -c 1000000/dev/urandom | wc
此表包含主机和接口(interface)列UNIQUE 组合* 编辑:这个表也有一个自动递增的唯一 ID,抱歉我应该在之前提到这个 ** | host.... | interface..... |
我想将具有固定补丁大小的“std filter”应用于单 channel 图像。 也就是说,我希望 out[i,j] 等于 img[i,j] 附近的像素值的标准值。 对于那些熟悉 Matlab 的人,
假设我想进行网络调用并使用 rx.Single,因为我希望只有一个值。 我如何应用replay().autoConnect() 这样的东西,这样当我从多个来源订阅时网络调用就不会发生多次?我应该使用
我将图像从 rgb 转换为 YUV。现在我想单独找到亮度 channel 的平均值。你能告诉我如何实现这一目标吗?此外,有没有办法确定图像由多少个 channel 组成? 最佳答案 你可以这样做: #
在比较Go和Scala的语句结束检测时,我发现Scala的规则更丰富,即: A line ending is treated as a semicolon unless one of the foll
在IEEE 1800-2005或更高版本中,&和&&二进制运算符有什么区别?它们相等吗? 我注意到,当a和b的类型为bit时,这些coverpoint定义的行为相同: cp: coverpoint a
我正在使用Flutter的provider软件包。我要实现的是为一个 View 或页面提供一个简单的提供程序。因此,我在小部件中尝试了以下操作: Widget build(BuildContext c
我正在尝试在 cython 中使用 openmp。我需要在 cython 中做两件事: i) 在我的 cython 代码中使用 #pragma omp single{} 作用域。 ii) 使用#pra
我正在尝试从转义字符字符串中删除单引号和双引号。它对单引号 ' 或双自动 " 不起作用。 请问有人可以帮忙吗? var mysting = escapedStr.replace(/^%22/g, '
我正在尝试在 cython 中使用 openmp。我需要在 cython 中做两件事: i) 在我的 cython 代码中使用 #pragma omp single{} 作用域。 ii) 使用#pra
我正在使用 ANT+ 协议(protocol),将智能手机与 ANT+ USB 加密狗连接,该加密狗通过 SimulANT+ 连接到 PC。 SimulANT+ 正在模拟一个心率传感器,它将数据发送到
有人可以解释/理解单/多线程模式下计算结果的不同吗? 这是一个大约的例子。圆周率的计算: #include #include #include const int itera(100000000
我编写了一个粗略的阴影映射实现,它使用 6 个不同的 View 矩阵渲染场景 6 次以创建立方体贴图。 作为优化,我正在尝试使用几何着色器升级到单 channel 方法,但很难从我的着色器获得任何输出
尝试使用 Single-Spa 构建一些东西并面临添加到应用程序 AngularJS 的问题。 Angular2 和 ReactJs 工作完美,但如果添加 AngularJS 并尝试为此应用程序使用
我是一名优秀的程序员,十分优秀!