gpt4 book ai didi

c++ - 线程池 : Block destruction until all work is done

转载 作者:行者123 更新时间:2023-11-30 03:37:12 26 4
gpt4 key购买 nike

我有以下线程池实现:

template<typename... event_args>
class thread_pool{
public:

using handler_type = std::function<void(event_args...)>;

thread_pool(handler_type&& handler, std::size_t N = 4, bool finish_before_exit = true) : _handler(std::forward<handler_type&&>(handler)),_workers(N),_running(true),_finish_work_before_exit(finish_before_exit)
{
for(auto&& worker: _workers)
{
//worker function
worker = std::thread([this]()
{
while (_running)
{
//wait for work
std::unique_lock<std::mutex> _lk{_wait_mutex};
_cv.wait(_lk, [this]{
return !_events.empty() || !_running;
});
//_lk unlocked

//check to see why we woke up
if (!_events.empty()) {//was it new work
std::unique_lock<std::mutex> _readlk(_queue_mutex);
auto data = _events.front();
_events.pop();
_readlk.unlock();

invoke(std::move(_handler), std::move(data));
_cv.notify_all();
}else if(!_running){//was it a signal to exit
break;
}
//or was it spurious and we should just ignore it
}
});
//end worker function
}
}

~thread_pool()
{
if(_finish_work_before_exit)
{//block destruction until all work is done
std::condition_variable _work_remains;
std::mutex _wr;

std::unique_lock<std::mutex> lk{_wr};
_work_remains.wait(lk,[this](){
return _events.empty();
});
}

_running=false;

//let all workers know to exit
_cv.notify_all();


//attempt to join all workers
for(auto&& _worker: _workers)
{
if(_worker.joinable())
{
_worker.join();
}
}
}

handler_type& handler()
{
return _handler;
}

void propagate(event_args&&... args)
{
//lock before push
std::unique_lock<std::mutex> _lk(_queue_mutex);
{
_events.emplace(std::make_tuple(args...));
}
_lk.unlock();//explicit unlock
_cv.notify_one();//let worker know that data is available
}

private:
bool _finish_work_before_exit;

handler_type _handler;

std::queue<std::tuple<event_args...>> _events;

std::vector<std::thread> _workers;

std::atomic_bool _running;

std::condition_variable _cv;

std::mutex _wait_mutex;

std::mutex _queue_mutex;


//helpers used to unpack tuple into function call
template<typename Func, typename Tuple, std::size_t... I>
auto invoke_(Func&& func, Tuple&& t, std::index_sequence<I...>)
{
return func(std::get<I>(std::forward<Tuple&&>(t))...);
}

template<typename Func, typename Tuple, typename Indicies = std::make_index_sequence<std::tuple_size<Tuple>::value>>
auto invoke(Func&& func, Tuple&& t)
{
return invoke_(std::forward<Func&&>(func), std::forward<Tuple&&>(t), Indicies());
}
};

我最近将此部分添加到析构函数中:

if(_finish_work_before_exit)
{//block destruction until all work is done
std::condition_variable _work_remains;
std::mutex _wr;

std::unique_lock<std::mutex> lk{_wr};
_work_remains.wait(lk,[this](){
return _events.empty();
});
}

目的是让析构函数阻塞,直到工作队列被完全消耗掉。

但是好像让程序陷入了僵局。 a所有的工作都完成了,但等待似乎并没有在工作完成时结束。

考虑这个例子:

std::mutex writemtx;


thread_pool<int> pool{
[&](int i){
std::unique_lock<std::mutex> lk{writemtx};
std::cout<<i<<" : "<<std::this_thread::get_id()<<std::endl;
},
8//threads
};

for (int i=0; i<8192; ++i) {
pool.propagate(std::move(i));
}

如何让析构函数等待工作完成而不造成死锁?

最佳答案

您的代码死锁的原因是 _work_remains 是一个条件变量,您的代码的任何部分都不会“通知”它。您需要将其设为类属性,并让从 _events 中获取最后一个事件的任何线程通知它。

关于c++ - 线程池 : Block destruction until all work is done,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40299150/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com