gpt4 book ai didi

c++ - std::condition_variable::notify_all() 只唤醒我的线程池中的一个线程

转载 作者:太空宇宙 更新时间:2023-11-04 12:34:09 26 4
gpt4 key购买 nike

我正在尝试编写一个非常简单的线程池来了解它们在幕后是如何工作的。不过,我遇到了一个问题。当我使用 condition_variable 并调用 notify_all() 时,它只会唤醒池中的一个线程。

其他一切正常。我已经排队了 900 个作业,每个作业都有不错的有效负载。唤醒的一个线程消耗了所有这些工作,然后又回到 sleep 状态。在下一个循环中,这一切再次发生。

问题是只有一个线程完成工作!我怎么搞砸了这个模板?

线程池.h:

#pragma once

#include <mutex>
#include <stack>
#include <atomic>
#include <thread>
#include <condition_variable>

class ThreadPool
{
friend void __stdcall ThreadFunc();

public:
static ThreadPool & GetInstance()
{
static ThreadPool sInstance;

return (sInstance);
}

public:
void AddJob(Job * job);
void DoAllJobs();

private:
Job * GetJob();

private:
const static uint32_t ThreadCount = 8;

std::mutex JobMutex;
std::stack<Job *> Jobs;
volatile std::atomic<int> JobWorkCounter;
std::mutex SharedLock;
std::thread Threads[ThreadCount];
std::condition_variable Signal;

private:
ThreadPool();
~ThreadPool();

public:
ThreadPool(ThreadPool const &) = delete;
void operator = (ThreadPool const &) = delete;
};

线程池.cpp:

#include "ThreadPool.h"

void __stdcall ThreadFunc()
{
std::unique_lock<std::mutex> lock(ThreadPool::GetInstance().SharedLock);

while (true)
{
ThreadPool::GetInstance().Signal.wait(lock);

while (Job * job = ThreadPool::GetInstance().GetJob())
{
job->_jobFn(job->_args);
ThreadPool::GetInstance().JobWorkCounter--;
}
}
}

ThreadPool::ThreadPool()
{
JobWorkCounter = 0;

for (uint32_t i = 0; i < ThreadCount; ++i)
Threads[i] = std::thread(ThreadFunc);
}

ThreadPool::~ThreadPool()
{
}

void ThreadPool::AddJob(Job * job)
{
JobWorkCounter++;

JobMutex.lock();
{
Jobs.push(job);
}
JobMutex.unlock();
}

void ThreadPool::DoAllJobs()
{
Signal.notify_all();

while (JobWorkCounter > 0)
{
Sleep(0);
}
}

Job * ThreadPool::GetJob()
{
Job * return_value = nullptr;

JobMutex.lock();
{
if (Jobs.empty() == false)
{
return_value = Jobs.top();
Jobs.pop();
}
}
JobMutex.unlock();

return (return_value);
}

感谢您的帮助!抱歉发布了大代码。

最佳答案

除非您想设计一个新模式,否则使用条件变量的简单“猴子看猴子做”的方法总是有 3 个东西。

一个条件变量、一个互斥锁和一条消息。

std::condition_variable cv;
mutable std::mutex m;
your_message_type message;

然后有 3 种模式可以遵循。发送一条消息:

std::unique_lock l{m}; // C++17, don't need to pass type
set_message_data(message);
cv.notify_one();

发送大量消息:

std::unique_lock l{m};
set_lots_of_message_data(message);
cv.notify_all();

最后,等待和处理消息:

while(true) {
auto data = [&]()->std::optional<data_to_process>{
std::unique_lock l{m};
cv.wait( l, [&]{ return done() || there_is_a_message(message); } );
if (done()) return {};
return get_data_to_process(message);
}();
if (!data) break;
auto& data_to_process = *data;
// process the data
}

有一定的灵 active 。但是有许多规则需要遵循。

  1. 在设置消息数据和通知之间,您必须锁定互斥量。

  2. 您应该始终使用 wait 的 lambda 版本——在没有 lambda 版本的情况下执行此操作意味着您在 100 次中有 99 次做错。

  3. 消息数据应该足以确定是否应该完成一项任务,如果不是因为讨厌的线程和锁等等。

  4. 仅使用 RAII 方式来锁定/解锁互斥量。没有它的正确性几乎是不可能的。

  5. 处理内容时不要持有锁。保持锁定足够长的时间以处理数据,然后放下锁定。

您的代码违反了 2、3、4、5。我认为您没有搞砸 1。

但是,如果您在通知时锁定 cv,那么现代 cv 实现实际上非常高效。

我认为最明显的症状来自 3:您的工作线程始终持有锁,因此只有一个可以进行。其他人会导致您的代码出现其他问题。


现在,超越这种相对简单的模式是可能的。但是一旦你这样做了,你真的需要至少对 C++ 线程模型有一个基本的了解,你不能通过编写代码和“看看它是否有效”来学习。您必须坐下来仔细阅读规范,了解条件变量在标准中的作用,了解互斥量的作用,编写一些代码,坐下来找出为什么它不起作用,找其他人写类似的代码,它有问题,找出其他人如何调试它并发现错误,回到你的代码并找到同样的错误,调整它,然后重复。

这就是我使用条件变量编写原语的原因,我不将条件变量与其他逻辑(例如,维护线程池)混合在一起。

写一个线程安全的队列。它所做的只是维护一个队列,并在有数据要读取时通知消费者。

最简单的一个有 3 个成员变量 -- 一个互斥量、一个条件变量和一个标准队列。

然后用关闭功能来增强它——现在 pop 必须返回一个可选的或有一些其他的失败路径。

您的任务需要先对任务进行批处理,然后再将它们全部解雇。你确定你想要那个吗?为此,我要做的是在线程安全队列中添加一个“推送多个任务”接口(interface)。然后在非线程安全队列中维护“未就绪”任务,并且仅在我们希望线程使用它们时才将它们全部推送。

“线程池”然后消费线程安全队列。因为我们单独编写了线程安全队列,所以移动部件的数量减少了一半,这意味着关系减少了 4 倍。

线程代码很难。尊重它。

关于c++ - std::condition_variable::notify_all() 只唤醒我的线程池中的一个线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57219650/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com