gpt4 book ai didi

c++ - 如何等待此线程池中的所有任务完成?

转载 作者:行者123 更新时间:2023-11-27 23:36:02 25 4
gpt4 key购买 nike

我正在尝试编写一个ThreadPool

class ThreadPool {
public:
ThreadPool(size_t numberOfThreads):isAlive(true) {
for(int i =0; i < numberOfThreads; i++) {
workerThreads.push_back(std::thread(&ThreadPool::doJob, this));
}

#ifdef DEBUG
std::cout<<"Construction Complete"<<std::endl;
#endif
}

~ThreadPool() {
#ifdef DEBUG
std::cout<<"Destruction Start"<<std::endl;
#endif

isAlive = false;
conditionVariable.notify_all();
waitForExecution();

#ifdef DEBUG
std::cout<<"Destruction Complete"<<std::endl;
#endif
}

void waitForExecution() {
for(std::thread& worker: workerThreads) {
worker.join();
}
}

void addWork(std::function<void()> job) {
#ifdef DEBUG
std::cout<<"Adding work"<<std::endl;
#endif
std::unique_lock<std::mutex> lock(lockListMutex);
jobQueue.push_back(job);
conditionVariable.notify_one();
}

private:
// performs actual work
void doJob() {
// try {
while(isAlive) {
#ifdef DEBUG
std::cout<<"Do Job"<<std::endl;
#endif

std::unique_lock<std::mutex> lock(lockListMutex);
if(!jobQueue.empty()) {
#ifdef DEBUG
std::cout<<"Next Job Found"<<std::endl;
#endif

std::function<void()> job = jobQueue.front();
jobQueue.pop_front();
job();
}
conditionVariable.wait(lock);
}
}

// a vector containing worker threads
std::vector<std::thread> workerThreads;

// a queue for jobs
std::list<std::function<void()>> jobQueue;

// a mutex for synchronized insertion and deletion from list
std::mutex lockListMutex;

std::atomic<bool> isAlive;

// condition variable to track whether or not there is a job in queue
std::condition_variable conditionVariable;
};

我正在从我的主线程向这个线程池添加工作。我的问题是调用 waitForExecution() 导致主线程永远等待。我需要能够在所有工作完成后终止线程并从那里继续执行主线程。我应该如何进行这里操作?

最佳答案

编写健壮的线程池的第一步是将队列从线程管理中分离出来。一个线程安全的队列很难自己编写,并且类似地管理线程。

线程安全队列如下所示:

template<class T>
struct threadsafe_queue {
boost::optional<T> pop() {
std::unique_lock<std::mutex> l(m);
cv.wait(l, [&]{ aborted || !data.empty(); } );
if (aborted) return {};
return data.pop_front();
}

void push( T t )
{
std::unique_lock<std::mutex> l(m);
if (aborted) return;
data.push_front( std::move(t) );
cv.notify_one();
}

void abort()
{
std::unique_lock<std::mutex> l(m);
aborted = true;
data = {};
cv.notify_all();
}
~threadsafe_queue() { abort(); }
private:
std::mutex m;
std::condition_variable cv;
std::queue< T > data;
bool aborted = false;
};

当队列中止时,pop 返回 nullopt

现在我们的线程池很简单了:

struct threadpool {
explicit threadpool(std::size_t n) { add_threads(n); }
threadpool() = default;
~threadpool(){ abort(); }

void add_thread() { add_threads(1); }
void add_threads(std::size_t n)
{
for (std::size_t i = 0; i < n; ++i)
threads.push_back( std::thread( [this]{ do_thread_work(); } ) );
}
template<class F>
auto add_task( F && f )
{
using R = std::result_of_t< F&() >;
auto pptr = std::make_shared<std::promise<R>>();
auto future = pptr.get_future();
tasks.push([pptr]{ (*pptr)(); });
return future;
}
void abort()
{
tasks.abort();
while (!threads.empty()) {
threads.back().join();
threads.pop_back();
}
}
private:
threadsafe_queue< std::function<void()> > tasks;
std::vector< std::thread > threads;
void do_thread_work() {
while (auto f = tasks.pop()) {
(*f)();
}
}
};

请注意,如果您中止,未完成的 future 将充满违约异常。

当它们从中获取数据的队列中止时,工作线程停止运行。 abort() 上的主线程将等待工作线程完成(明智的做法)。

这确实意味着工作线程任务也必须终止,否则主线程将挂起。没有办法避免这种情况;通常,您的工作线程的任务需要合作才能收到一条消息,说明它们应该提前中止。

Boost 有一个线程池,它集成了它的线程原语,并允许较少的协作中止;在其中,所有互斥锁类型的操作都隐式检查中止标志,如果他们看到它,操作就会抛出。

关于c++ - 如何等待此线程池中的所有任务完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59144237/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com