gpt4 book ai didi

c++ - 固定大小的线程安全队列

转载 作者:行者123 更新时间:2023-11-30 03:41:47 24 4
gpt4 key购买 nike

我想做的是使用多个线程将整数推送到我的 threadSafe 队列实现,并与另一系列线程同时弹出插入的数字。所有这些操作都必须是线程安全的,但我想要的另一个选择是队列的大小必须是固定的,就像缓冲区一样。如果缓冲区已满,所有推送线程必须等待弹出线程释放一些槽。

这是我对队列/缓冲区的实现,它似乎可以工作,但在几次迭代后它停止并保持阻塞状态,没有任何错误。

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>

template <typename T>

class Queue
{
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;

public:

T pop()
{
std::unique_lock<std::mutex> mlock(mutex_);

cond_.wait(mlock, [this]{return !queue_.empty();});

auto val = queue_.front();
queue_.pop();
return val;
}

void pop(T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);

cond_.wait(mlock, [this]{return !queue_.empty();});

item = queue_.front();
queue_.pop();
}

void push(const T& item, int buffer)
{
std::unique_lock<std::mutex> mlock(mutex_);

while (queue_.size() >= buffer)
{
cond_.wait(mlock);
}

queue_.push(item);
mlock.unlock();
cond_.notify_one();
}

Queue()=default;
Queue(const Queue&) = delete; // disable copying
Queue& operator=(const Queue&) = delete; // disable assignment

};

缓冲区的大小在 push 函数中用变量 buffer 定义。这是一个用法示例:

 void prepare(Queue<int>& loaded, int buffer, int num_frames)
{
for (int i = 0; i < num_frames; i++)
{
cout<< "push "<<i<<endl;
loaded.push(i, buffer);
}
}

void load (vector<Frame>& movie, Queue<int>& loaded, int num_frames,
int num_points, int buffer, int height, int width)
{
for (int i = 0; i < num_frames; i++)
{
int num = loaded.pop();
cout<< "pop "<<num<<endl;
}
}

int main()
{
srand(time(NULL));

int num_threadsXstage = 4;

int width = 500;
int height = 500;

int num_points = width * height;

int num_frames = 100;

int frames_thread = num_frames/num_threadsXstage;

int preset = 3;

int buffer = 10;

//Vectors of threads
vector<thread> loader;

//Final vector
vector<Frame> movie;
movie.resize(num_frames);

//Working queues
Queue<int> loaded;

//Prepare loading queue task
thread preparator(prepare, ref(loaded), buffer, num_frames);

for (int i = 0; i < num_threadsXstage; i++)
{
//stage 1
loader.push_back(thread(&load, ref(movie), ref(loaded), frames_thread,
num_points, buffer, height, width));

}


// JOIN
preparator.join();

join_all(loader);

return 0;
}

最佳答案

您的pop 函数可以让等待push 的线程取得进展,但它们不会调用任何notify 函数。任何时候您都必须调用适当的 notify 函数,使阻塞在条件变量上的线程有可能继续前进。

虽然解释原因相当复杂,但您应该在仍然持有锁的情况下调用 notify_all 或调用 notify_one。否则理论上可以“唤醒错误的线程”,因为您对两个谓词使用相同的条件变量(队列不为空且队列未满)。

为避免难以理解的故障模式,请始终执行以下三项操作之一:

  1. 不要使用同一个条件变量来处理多个谓词。例如,对“非空”使用一个条件变量,对“未满”使用另一个条件变量;
  2. 始终使用notify_all,从不使用notify_one;或
  3. 始终在持有互斥量的同时调用通知函数。

只要您至少遵循这三个规则中的一个,就可以避免模糊的故障模式,在这种模式下,您只唤醒一个在释放互斥量后选择休眠的线程,同时留下唯一可以处理该条件的线程仍然阻塞.

关于c++ - 固定大小的线程安全队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37172549/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com