gpt4 book ai didi

C++ 主线程通知线程通知主线程

转载 作者:太空宇宙 更新时间:2023-11-04 13:27:04 24 4
gpt4 key购买 nike

好吧,我需要帮助。我正在尝试做一些具体的事情,但缺乏多线程技能让我很沮丧。

基本上我的主程序/线程需要管理许多必须运行多次的“ channel ”。由于这些运行是独立的,因此每个 channel 都包含一个执行它们的线程。

因此主线程必须等待所有 channel (线程)完成运行才能启动下一个 channel 。所有 channel 都必须等待主线程通知它们可以运行。

下面是我是如何做到的 - 抱歉有点长!

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <atomic>

std::mutex g_lockprint;
std::mutex g_lockbatch;
std::condition_variable g_nextbatch;
std::mutex g_lockready;
std::condition_variable g_ready;

int global_id = 0;
int nbChannels = 5;
std::atomic<int> nbChannelsLeftToEnd;

class Channel {

private:

int _id;
std::thread _th;
std::atomic<bool> next_batch;
std::atomic<bool> stop_th;

public:

Channel() : _id(global_id++), _th(), next_batch(false), stop_th(false) {}

void go_for_next_batch() { next_batch = true; }

void start(int& start, int &end){
_th = std::thread(&Channel::run, this, std::ref(start), std::ref(end));
}

void stop(){
stop_th = true;
_th.join();
}

void run(int& start, int& end){
while (!stop_th){
{
std::unique_lock<std::mutex> locker(g_lockbatch);
g_nextbatch.wait(locker, [&](){return (next_batch==true); });
}

// print a starting message
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[channel " << _id << "]\trunning in [" << start << "," << end << "]" << std::endl;
}

// simulate work
std::this_thread::sleep_for(std::chrono::seconds(1));

// update the number of channels left to run
nbChannelsLeftToEnd--;
g_ready.notify_one();
next_batch = false;
}
}
};

int main()
{
int end = 100;
int batch = 10;
int startBatch = 0;
int endBatch = startBatch + batch;

// declare some channels (threads)
std::vector<Channel> channels(nbChannels);

// start the threads
for (auto& ch : channels) ch.start(startBatch, endBatch);

while (endBatch<=end){
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[main]\trunning in [" << startBatch << "," << endBatch << "]" << std::endl;
}
nbChannelsLeftToEnd = nbChannels;
for (auto& ch : channels) ch.go_for_next_batch();
g_nextbatch.notify_all();

std::unique_lock<std::mutex> locker(g_lockready);
g_ready.wait(locker, [&](){return (nbChannelsLeftToEnd == 0); });

startBatch += batch;
endBatch += batch;
}

for (auto& ch : channels) ch.stop();

return 0;
}

但有时程序会阻塞,可能是线程相互等待,但我不明白为什么。在任何情况下,加入线程(main 末尾的“停止”方法)都会使我的程序无限期运行,也看不出为什么。

编辑:感谢您的评论和一些研究,我设法获得了一个使用同步屏障的工作程序,因此主线程可以等待所有其他线程完成当前批处理,然后再告诉它们开始下一个。我重复使用了这里有人引用的屏障代码 Anthony Wiiliams's book - 这是障碍:

class barrier
{
unsigned const count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;

public:
explicit barrier(unsigned count_) :
count(count_), spaces(count), generation(0) {}

void wait()
{
unsigned const my_generation = generation;
if (!--spaces)
{
spaces = count;
++generation;
}
else
{
while (generation == my_generation)
std::this_thread::yield();
}
}
};

这是使用屏障的 Channel 类的新 run 方法 - 注意对“stop_th”标志的附加测试。当线程在最后一批之后和加入之前被解除阻塞时,它不应该运行另一批,因此该测试。

void run(int& start, int& end, barrier& b)
{
while (!stop_th){
// wait for next batch notification - use the next_batch flag to avoid
// spurious wake-ups
{
std::unique_lock<std::mutex> locker(g_lockbatch);
g_nextbatch.wait(locker, [&](){return (next_batch==true); });
}

if (stop_th) return;

// simulate work
std::this_thread::sleep_for(std::chrono::seconds(1));

// wait for everyone to meet
next_batch = false;
b.wait();
}
}

最后是主要内容:

int main()
{
int end = 100;
int batch = 10;
int startBatch = 0;
int endBatch = startBatch + batch;

// declare a barrier where all threads will meet
barrier b(nbChannels+1);

// declare some channels (threads)
std::vector<Channel> channels(nbChannels);

// start the threads
for (auto& ch : channels) ch.start(startBatch, endBatch, b);

while (endBatch<=end){

// notify the channels they can process one batch
for (auto& ch : channels) ch.go_for_next_batch();
g_nextbatch.notify_all();

// wait until all threads have finished their batch
b.wait();

// prepare the next one
startBatch += batch;
endBatch += batch;
}

// all channels are blocked by the next_batch condition
// so notify a next batch and join them
for (auto& ch : channels) ch.stop();
for (auto& ch : channels) ch.go_for_next_batch();
g_nextbatch.notify_all();
for (auto& ch : channels) ch.wait_until_stopped();

return 0;
}

再次感谢您的所有评论/回答!!!

最佳答案

我将我的评论更改为答案,因为我修复了 cpp.sh 中的代码并且现在似乎已完成。

关于调用停止时它们不存在。请注意,他们可能仍然在等待下一批锁。考虑添加一个调用来解除他们的锁定,并让他们检查他们是否在锁定步骤后停止。

将停止函数分成两个函数,一个是您更改 bool 值,另一个是您等待的地方。让我们调用这两个函数 stop 和 wait_until_stopped

然后在main函数中加入如下代码。

代替

for (auto& ch : channels) ch.stop();

使用:

for (auto& ch : channels) ch.stop();

for (auto& ch : channels) ch.go_for_next_batch();

g_nextbatch.notify_all();

for (auto& ch : channels) ch.wait_until_stopped();

关于C++ 主线程通知线程通知主线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32986572/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com