gpt4 book ai didi

c++ - 有效地等待线程池中的所有任务完成

转载 作者:可可西里 更新时间:2023-11-01 15:06:26 27 4
gpt4 key购买 nike

我目前有一个程序在我的线程池中有 x 个 worker 。在主循环中,y 任务被分配给工作人员完成,但任务发出后,我必须等待所有任务完成,然后才能继续执行程序。我相信我当前的解决方案效率低下,必须有更好的方法来等待所有任务完成,但我不确定如何去做

// called in main after all tasks are enqueued to 
// std::deque<std::function<void()>> tasks
void ThreadPool::waitFinished()
{
while(!tasks.empty()) //check if there are any tasks in queue waiting to be picked up
{
//do literally nothing
}
}

更多信息:

线程池结构

//worker thread objects
class Worker {
public:
Worker(ThreadPool& s): pool(s) {}
void operator()();
private:
ThreadPool &pool;
};

//thread pool
class ThreadPool {
public:
ThreadPool(size_t);
template<class F>
void enqueue(F f);
void waitFinished();
~ThreadPool();
private:
friend class Worker;
//keeps track of threads so we can join
std::vector< std::thread > workers;
//task queue
std::deque< std::function<void()> > tasks;
//sync
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};

或者这里是 gist我的 threadpool.hpp

我想使用 waitFinished() 的示例:

while(running)
//....
for all particles alive
push particle position function to threadpool
end for

threadPool.waitFinished();

push new particle position data into openGL buffer
end while

这样我就可以发送成百上千个粒子位置任务来并行完成,等待它们完成并将新数据放入 openGL 位置缓冲区

最佳答案

这是完成您正在尝试的事情的一种方法。在同一个互斥量上使用两个条件变量并不适合轻松的人,除非您知道内部发生了什么。除了我想要证明每次运行之间完成了多少项之外,我不需要原子处理成员。

其中的示例工作负载函数生成一百万个随机 int 值,然后对它们进行排序(必须以某种方式加热我的办公室)。 waitFinished 不会返回,直到队列为空并且没有线程正忙。

#include <iostream>
#include <deque>
#include <functional>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <random>

//thread pool
class ThreadPool
{
public:
ThreadPool(unsigned int n = std::thread::hardware_concurrency());

template<class F> void enqueue(F&& f);
void waitFinished();
~ThreadPool();

unsigned int getProcessed() const { return processed; }

private:
std::vector< std::thread > workers;
std::deque< std::function<void()> > tasks;
std::mutex queue_mutex;
std::condition_variable cv_task;
std::condition_variable cv_finished;
std::atomic_uint processed;
unsigned int busy;
bool stop;

void thread_proc();
};

ThreadPool::ThreadPool(unsigned int n)
: busy()
, processed()
, stop()
{
for (unsigned int i=0; i<n; ++i)
workers.emplace_back(std::bind(&ThreadPool::thread_proc, this));
}

ThreadPool::~ThreadPool()
{
// set stop-condition
std::unique_lock<std::mutex> latch(queue_mutex);
stop = true;
cv_task.notify_all();
latch.unlock();

// all threads terminate, then we're done.
for (auto& t : workers)
t.join();
}

void ThreadPool::thread_proc()
{
while (true)
{
std::unique_lock<std::mutex> latch(queue_mutex);
cv_task.wait(latch, [this](){ return stop || !tasks.empty(); });
if (!tasks.empty())
{
// got work. set busy.
++busy;

// pull from queue
auto fn = tasks.front();
tasks.pop_front();

// release lock. run async
latch.unlock();

// run function outside context
fn();
++processed;

latch.lock();
--busy;
cv_finished.notify_one();
}
else if (stop)
{
break;
}
}
}

// generic function push
template<class F>
void ThreadPool::enqueue(F&& f)
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace_back(std::forward<F>(f));
cv_task.notify_one();
}

// waits until the queue is empty.
void ThreadPool::waitFinished()
{
std::unique_lock<std::mutex> lock(queue_mutex);
cv_finished.wait(lock, [this](){ return tasks.empty() && (busy == 0); });
}

// a cpu-busy task.
void work_proc()
{
std::random_device rd;
std::mt19937 rng(rd());

// build a vector of random numbers
std::vector<int> data;
data.reserve(100000);
std::generate_n(std::back_inserter(data), data.capacity(), [&](){ return rng(); });
std::sort(data.begin(), data.end(), std::greater<int>());
}

int main()
{
ThreadPool tp;

// run five batches of 100 items
for (int x=0; x<5; ++x)
{
// queue 100 work tasks
for (int i=0; i<100; ++i)
tp.enqueue(work_proc);

tp.waitFinished();
std::cout << tp.getProcessed() << '\n';
}

// destructor will close down thread pool
return EXIT_SUCCESS;
}

输出

100
200
300
400
500

祝你好运。

关于c++ - 有效地等待线程池中的所有任务完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23896421/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com