gpt4 book ai didi

c++ - 单生产者,单消费者数据结构,C++中的双缓冲区

转载 作者:IT老高 更新时间:2023-10-28 12:54:38 26 4
gpt4 key购买 nike

我在 $work 有一个应用程序,我必须在两个以不同频率调度的实时线程之间移动。 (实际调度超出了我的控制。)应用程序是硬实时的(其中一个线程必须驱动硬件接口(interface)),因此线程之间的数据传输应该是无锁和无等待的尽可能。

需要注意的是,只需要传输一个数据块:因为两个线程的运行速率不同,所以在较慢线程的两次唤醒之间,会出现较快线程的两次迭代完成的情况;在这种情况下,可以覆盖写入缓冲区中的数据,以便较慢的线程仅获取最新数据。

换句话说,代替队列,双缓冲解决方案就足够了。这两个缓冲区是在初始化期间分配的,读写线程可以调用类的方法来获取指向这些缓冲区之一的指针。

C++代码:

#include <mutex>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() {
m_write_busy = false;
m_read_idx = m_write_idx = 0;
}

~ProducerConsumerDoubleBuffer() { }

// The writer thread using this class must call
// start_writing() at the start of its iteration
// before doing anything else to get the pointer
// to the current write buffer.
T * start_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);

m_write_busy = true;
m_write_idx = 1 - m_read_idx;

return &m_buf[m_write_idx];
}
// The writer thread must call end_writing()
// as the last thing it does
// to release the write busy flag.
void end_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);

m_write_busy = false;
}

// The reader thread must call start_reading()
// at the start of its iteration to get the pointer
// to the current read buffer.
// If the write thread is not active at this time,
// the read buffer pointer will be set to the
// (previous) write buffer - so the reader gets the latest data.
// If the write buffer is busy, the read pointer is not changed.
// In this case the read buffer may contain stale data,
// it is up to the user to deal with this case.
T * start_reading(void) {
std::lock_guard<std::mutex> lock(m_mutex);

if (!m_write_busy) {
m_read_idx = m_write_idx;
}

return &m_buf[m_read_idx];
}
// The reader thread must call end_reading()
// at the end of its iteration.
void end_reading(void) {
std::lock_guard<std::mutex> lock(m_mutex);

m_read_idx = m_write_idx;
}

private:
T m_buf[2];
bool m_write_busy;
unsigned int m_read_idx, m_write_idx;
std::mutex m_mutex;
};

为了避免读取器线程中的陈旧数据,有效载荷结构被版本化。
为了促进线程之间的双向数据传输,使用了上述怪物的两个实例,方向相反。

问题:
  • 这个方案是线程安全的吗?如果坏了,在哪里?
  • 没有互斥锁可以做到吗?也许只有内存屏障或 CAS 指令?
  • 可以做得更好吗?
  • 最佳答案

    很有趣的问题!比我最初想象的要棘手:-)
    我喜欢无锁解决方案,所以我尝试在下面解决一个。

    有很多方法可以考虑这个系统。你可以建模
    它作为一个固定大小的循环缓冲区/队列(有两个条目),但随后
    您无法更新下一个可用的消费值,
    因为你不知道消费者是否已经开始阅读最近发布的
    值或仍在(可能)读取前一个。所以额外的状态
    需要超出标准环形缓冲区的范围,以达到更优化的
    解决方案。

    首先请注意,生产者始终可以安全地写入一个单元格
    在任何给定的时间点;如果消费者正在读取一个单元格,则
    其他可以写入。让我们调用可以安全写入的单元格
    “事件”单元格(可以从中读取的单元格是任何不是的单元格
    活跃的)。事件单元只有在其他单元没有时才能切换
    当前正在读取。

    与始终可以写入的事件单元格不同,非事件单元格可以
    仅当它包含一个值时才被读取;一旦该值(value)被消耗,它就消失了。
    (这意味着在积极的生产者的情况下避免活锁;在某些
    点,消费者将清空单元格并停止接触单元格。一次
    发生这种情况时,生产者肯定可以发布一个值,而在此之前,
    如果消费者不在,它只能发布一个值(更改事件单元格)
    阅读中间。)

    如果有一个准备好被消费的值,只有消费者可以改变它
    事实(对于非事件单元格,无论如何);后续制作可能会更改哪个单元格
    是事件的和发布的值,但一个值将始终准备好被读取,直到
    它被消耗了。

    一旦生产者完成对事件单元格的写入,它可以通过以下方式“发布”这个值
    更改哪个单元格是事件单元格(交换索引),前提是消费者是
    不是在读取另一个单元格的中间。如果消费者在中间
    读取另一个单元格,交换不会发生,但在这种情况下消费者可以交换
    读取完值后,前提是生产者不在中间
    写(如果是,生产者将在完成后交换)。
    事实上,通常消费者在完成阅读后总是可以交换的(如果它是唯一的
    访问系统),因为消费者的虚假交换是良性的:如果有
    其他单元格中的某些内容,然后交换将导致接下来读取该内容,如果
    没有,交换没有任何影响。

    所以,我们需要一个共享变量来跟踪事件单元格是什么,我们还需要一个
    生产者和消费者表明他们是否在中间的方式
    手术。我们可以将这三块状态按顺序存储到一个原子变量中
    能够同时(原子地)影响它们。
    我们还需要一种方法让消费者检查里面是否有任何东西
    首先是非事件单元格,并且两个线程都可以修改该状态
    作为适当的。我尝试了其他一些方法,但最后最简单的就是
    将此信息也包含在另一个原子变量中。这让事情变得很多
    推理更简单,因为系统中的所有状态变化都是原子的。

    我想出了一个无等待的实现(无锁,并且所有操作都已完成
    在有限数量的指令中)。

    代码时间!

    #include <atomic>
    #include <cstdint>

    template <typename T>
    class ProducerConsumerDoubleBuffer {
    public:
    ProducerConsumerDoubleBuffer() : m_state(0) { }
    ~ProducerConsumerDoubleBuffer() { }

    // Never returns nullptr
    T* start_writing() {
    // Increment active users; once we do this, no one
    // can swap the active cell on us until we're done
    auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
    return &m_buf[state & 1];
    }

    void end_writing() {
    // We want to swap the active cell, but only if we were the last
    // ones concurrently accessing the data (otherwise the consumer
    // will do it for us when *it's* done accessing the data)

    auto state = m_state.load(std::memory_order_relaxed);
    std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
    state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
    if ((state & 0x6) == 0) {
    // The consumer wasn't in the middle of a read, we should
    // swap (unless the consumer has since started a read or
    // already swapped or read a value and is about to swap).
    // If we swap, we also want to clear the full flag on what
    // will become the active cell, otherwise the consumer could
    // eventually read two values out of order (it reads a new
    // value, then swaps and reads the old value while the
    // producer is idle).
    m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
    }
    }

    // Returns nullptr if there appears to be no more data to read yet
    T* start_reading() {
    m_readState = m_state.load(std::memory_order_relaxed);
    if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
    // Nothing to read here!
    return nullptr;
    }

    // At this point, there is guaranteed to be something to
    // read, because the full flag is never turned off by the
    // producer thread once it's on; the only thing that could
    // happen is that the active cell changes, but that can
    // only happen after the producer wrote a value into it,
    // in which case there's still a value to read, just in a
    // different cell.

    m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;

    // Now that we've incremented the user count, nobody can swap until
    // we decrement it
    return &m_buf[(m_readState & 1) ^ 1];
    }

    void end_reading() {
    if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
    // There was nothing to read; shame to repeat this
    // check, but if these functions are inlined it might
    // not matter. Otherwise the API could be changed.
    // Or just don't call this method if start_reading()
    // returns nullptr -- then you could also get rid
    // of m_readState.
    return;
    }

    // Alright, at this point the active cell cannot change on
    // us, but the active cell's flag could change and the user
    // count could change. We want to release our user count
    // and remove the flag on the value we read.

    auto state = m_state.load(std::memory_order_relaxed);
    std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
    state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
    if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
    // Oi, we were the last ones accessing the data when we released our cell.
    // That means we should swap, but only if the producer isn't in the middle
    // of producing something, and hasn't already swapped, and hasn't already
    // set the flag we just reset (which would mean they swapped an even number
    // of times). Note that we don't bother swapping if there's nothing to read
    // in the other cell.
    m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
    }
    }

    private:
    T m_buf[2];

    // The bottom (lowest) bit will be the active cell (the one for writing).
    // The active cell can only be switched if there's at most one concurrent
    // user. The next two bits of state will be the number of concurrent users.
    // The fourth bit indicates if there's a value available for reading
    // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
    std::atomic<std::uint32_t> m_state;

    std::uint32_t m_readState;
    };

    请注意,语义是这样的,消费者永远不能两次读取给定的值,
    并且它读取的值总是比它读取的最后一个值新。也还算可以
    内存使用效率高(两个缓冲区,就像您的原始解决方案一样)。我避免了 CAS 循环
    因为它们通常比争用的单个原子操作效率低。

    如果你决定使用上面的代码,我建议你先为它编写一些全面的(线程)单元测试。
    和适当的基准。我确实测试过它,但只是勉强。如果您发现任何错误,请告诉我:-)

    我的单元测试:
    ProducerConsumerDoubleBuffer<int> buf;
    std::thread producer([&]() {
    for (int i = 0; i != 500000; ++i) {
    int* item = buf.start_writing();
    if (item != nullptr) { // Always true
    *item = i;
    }
    buf.end_writing();
    }
    });
    std::thread consumer([&]() {
    int prev = -1;
    for (int i = 0; i != 500000; ++i) {
    int* item = buf.start_reading();
    if (item != nullptr) {
    assert(*item > prev);
    prev = *item;
    }
    buf.end_reading();
    }
    });
    producer.join();
    consumer.join();

    至于你原来的实现,我只是粗略地看了一遍(这样更有趣
    设计新东西,呵呵),但 david.pfx 的答案似乎解决了您问题的那部分。

    关于c++ - 单生产者,单消费者数据结构,C++中的双缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23666069/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com