gpt4 book ai didi

c++ - 如何正确地在两组线程之间来回传递控制权

转载 作者:太空宇宙 更新时间:2023-11-04 12:58:30 24 4
gpt4 key购买 nike

我只想用 C++ 11 的标准库实现一个线程池。我想公开的接口(interface)是让我的主线程一次提交很多作业,等到所有线程都完成后再继续。这是我第一次显式处理线程,所以不可避免地遇到了一些死锁问题。这是我的代码:

class CrashQueue {
private:
std::vector<std::thread> workers;
std::queue<void*> payloads;
std::function<void(void*)> function;
std::mutex taskFetchingMutex;
long aliveWorkers;

std::condition_variable alarmClock;
std::condition_variable sleepClock;
std::mutex sleepClockMutex;
bool running = true;

public:
CrashQueue(std::size_t threadCount = std::thread::hardware_concurrency()) {
for (std::size_t i = 0; i < threadCount; i ++) {
workers.emplace_back([this]() -> void {
while (running) {
void* payload;
{
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (payloads.empty()) {
aliveWorkers --;
if (aliveWorkers == 0)
sleepClock.notify_one();
alarmClock.wait(lock);
continue;
}
payload = payloads.front();
payloads.pop();
}

function(payload);
}
});
}
}

~CrashQueue() {
running = false;
alarmClock.notify_all();
for (auto& worker : workers)
worker.join();
}

void run() {
this->aliveWorkers = workers.size();
alarmClock.notify_all();

std::unique_lock<std::mutex> lock(sleepClockMutex);
sleepClock.wait(lock);
}

void commit(std::function<void(void*)>&& function, std::queue<void*>&& payloads) {
this->function = std::move(function);
this->payloads = std::move(payloads);
}
};

我怀疑问题出在工作线程中执行的构造函数的 lambda 表达式中:

if (payloads.empty()) {
aliveWorkers --;
if (aliveWorkers == 0)
sleepClock.notify_one();
alarmClock.wait(lock);
continue;
}

可能是最后一个工作线程唤醒主线程并在主线程唤醒所有其他线程后休眠。尽管如此,这似乎不太可能,但每次我不处于 Debug模式时都会发生死锁。有什么提示吗?

最佳答案

这个问题在某种程度上与我在这里使用两个互斥锁这一事实有关。按如下方式重写 run 使其工作:

void run() {
this->aliveWorkers = workers.size();
alarmClock.notify_all();

while (true) {
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (aliveWorkers.load() == 0)
break;
sleepClock.wait(lock);
}
}

但是,我无法通过形象化的图片来了解原始代码为何会失败。仍然需要任何解释的答案。

编辑:我看似正确的代码的完整来源:

#include <iostream>
#include <atomic>
#include <vector>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <random>
#include <algorithm>
#include <tuple>
#include <queue>

class CrashQueue {
private:
std::vector<std::thread> workers;
std::queue<void*> payloads;
std::function<void(void*)> function;
std::mutex taskFetchingMutex;
std::atomic<long> aliveWorkers;

std::condition_variable alarmClock;
std::condition_variable sleepClock;

bool running = true;

public:
CrashQueue(std::size_t threadCount = std::thread::hardware_concurrency())
: aliveWorkers(threadCount) {
for (std::size_t i = 0; i < threadCount; i ++) {
workers.emplace_back([this]() -> void {
while (running) {
void* payload;
{
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (payloads.empty()) {
aliveWorkers.fetch_sub(1);
sleepClock.notify_one();

alarmClock.wait(lock);
continue;
}
payload = payloads.front();
payloads.pop();
}

function(payload);
}
});
}

// Make sure all workers finished running.
while (aliveWorkers.load() > 0);
std::unique_lock<std::mutex> lock(taskFetchingMutex);
}

~CrashQueue() {
running = false;
alarmClock.notify_all();
for (auto& worker : workers)
worker.join();
}

void run() {
this->aliveWorkers = workers.size();
alarmClock.notify_all();

while (true) {
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (aliveWorkers.load() == 0)
break;
sleepClock.wait(lock);
}
}

void commit(std::function<void(void*)>&& function, std::queue<void*>&& payloads) {
this->function = std::move(function);
this->payloads = std::move(payloads);
}
};

关于c++ - 如何正确地在两组线程之间来回传递控制权,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45428845/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com