gpt4 book ai didi

c++ - std::condition_variable – 通知一次但等待线程被唤醒两次

转载 作者:太空狗 更新时间:2023-10-29 21:18:05 24 4
gpt4 key购买 nike

这是一个简单的 C++ 线程池实现。这是一个修改后的版本,源自 https://github.com/progschj/ThreadPool .

#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

namespace ThreadPool {

class FixedThreadPool {
public:
FixedThreadPool(size_t);

template<class F, class... Args>
auto Submit(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;

template<class F, class... Args>
void Execute(F&& f, Args&&... args);

~FixedThreadPool();

void AwaitTermination();

void Stop();

private:
void ThreadWorker();

// need to keep track of threads so we can join them
std::vector<std::thread> workers;

// the task queue
std::queue< std::function<void()> > tasks;

// synchronization
std::mutex worker_mutex;
std::mutex queue_mutex;
std::condition_variable condition;

// stop flag
bool stop_;

// thread size
int thread_size_;
};

// Constructor does nothing. Threads are created when new task submitted.
FixedThreadPool::FixedThreadPool(size_t num_threads):
stop_(false),
thread_size_(num_threads) {}

// Destructor joins all threads
FixedThreadPool::~FixedThreadPool() {
//std::this_thread::sleep_for(std::chrono::seconds(5));
for(std::thread &worker: workers) {
if (worker.joinable()) {
worker.join();
}
}
}

// Thread worker
void FixedThreadPool::ThreadWorker() {
std::function<void()> task;
while (1) {
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]() { return this->stop_ || !this->tasks.empty(); });
printf("wakeeeeeened\n");
if (this->stop_ && this->tasks.empty()) {
printf("returning ...\n");
return;
}
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}

// Add new work item to the pool
template<class F, class... Args>
auto FixedThreadPool::Submit(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type >
{
{
std::unique_lock<std::mutex> lock(this->worker_mutex);
if (workers.size() < thread_size_) {
workers.emplace_back(std::thread(&FixedThreadPool::ThreadWorker, this));
}
}

using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop_) {
throw std::runtime_error("ThreadPool has been shutdown.");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}

// Execute new task without returning std::future object.
template<class F, class... Args>
void FixedThreadPool::Execute(F&& f, Args&&... args) {
Submit(std::forward<F>(f), std::forward<Args>(args)...);
}

// Blocks and wait for all previously submitted tasks to be completed.
void FixedThreadPool::AwaitTermination() {
for(std::thread &worker: workers) {
if (worker.joinable()) {
worker.join();
}
}
}

// Shut down the threadpool. This method does not wait for previously submitted
// tasks to be completed.
void FixedThreadPool::Stop() {
printf("Stopping ...\n");
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop_ = true;
}
}


} // namespace ThreadPool

#endif /* __THREAD_POOL_H__ */

和测试 main.cpp:

#include <iostream>
#include <vector>
#include <chrono>
#include <exception>

#include "ThreadPool.h"

int main(int argc, char** argv) {
ThreadPool::FixedThreadPool pool(3);

pool.Execute([]() {
std::cout << "hello world" << std::endl;
}
);
pool.Stop();
pool.AwaitTermination();
std::cout << "All tasks complted." << std::endl;

return 0;
}

我在这个测试程序中有一个错误。只有一个任务被提交到线程池,但我的工作线程被唤醒了两次:

>>./test 
Stopping ...
wakeeeeeened
hello world
wakeeeeeened
returning ...
All tasks complted.

我认为问题出在 FixedThreadPool::ThreadWorker() 本身。工作线程持续等待条件变量以获取新任务。函数 FixedThreadPool::Submit() 添加一个新任务到队列并调用 condition.nofity_one() 来唤醒一个工作线程。

但是我想不通工作线程是如何被唤醒两次的。在此测试中,我只提交了一项任务,并且只有一个工作线程。

最佳答案

将评论转化为答案:

condition_variable::wait(lock, pred) 等同于 while(!pred()) wait(lock);。如果 pred() 返回 true,则实际上不会发生等待,调用会立即返回。

您的第一次唤醒来自 notify_one() 调用;第二个“唤醒”是因为第二个 wait() 调用恰好在 Stop() 调用之后执行,因此您的谓词返回 true 并且wait() 立即返回,无需等待。

很明显,您在这里运气不好:如果第二次 wait() 调用发生在 Stop() 调用之前,那么您的工作线程将永远等待(在没有虚假唤醒的情况下),您的主线程也是如此。

另外,去掉 __THREAD_POOL_H__。把那些双下划线烧到地上。

关于c++ - std::condition_variable – 通知一次但等待线程被唤醒两次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30988949/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com