gpt4 book ai didi

用于挂起线程的 C++ 线程池

转载 作者:行者123 更新时间:2023-11-28 04:58:58 24 4
gpt4 key购买 nike

为了执行一些性能测试,我需要在程序中的特定点启动一批线程。不幸的是,我必须采用基于线程的方式并且不能使用任务 (std::async),因为我必须将给定的线程固定到特定的核心(使用亲和性)。为了实现这种行为,我使用了 RAII - Scott Meyers 提到的“一次性”方法

到目前为止,这是我的代码:

template < class T >
typename std::decay< T >::type decay_copy( T&& v ) {
return std::forward< T >( v );
}


/**
* delayed thread - more or less copied from Scott Meyers:
* http://scottmeyers.blogspot.de/2013/12/threadraii-thread-suspension-trouble.html
*/
class del_thread {
private:
using future_t = std::shared_future< void >;
using thread_t = std::thread;

enum execution_state {
WAITING, TRIGGERED, DISMISSED
};

future_t _future;
thread_t _thread;

execution_state _state = WAITING;

public:
del_thread() = delete;
del_thread( del_thread const & ) = delete;
del_thread &operator=( del_thread const & dt ) = delete;

del_thread( del_thread && other ):
_future( std::move( other._future ) ),
_thread( std::move( other._thread ) ),
_state( std::move( other._state ) ) {
other._state = DISMISSED;
}

del_thread &operator=( del_thread && dt ) {
_future = std::move( dt._future );
_thread = std::move( dt._thread );
_state = std::move( dt._state );
dt._state = DISMISSED;
return *this;
}

template< typename op_t >
del_thread( op_t && operation, future_t const & future ):
_thread( [ operation = decay_copy(std::forward< op_t >( operation )),
_future = future,
&_state = _state
]() {
_future.wait();
if( _state == TRIGGERED || _state == DISMISSED ) {
return;
}
_state = TRIGGERED;
operation();
}
) {
}

~del_thread() {
join();
}

void join() {
if( _state == DISMISSED ) {
return;
}
if( _thread.joinable() ) {
_thread.join();
}
}
};

class batch_thread_pool {
private:
std::promise< void > _promise;
std::shared_future< void > _future;
std::vector< del_thread > _pool;

public:
batch_thread_pool() :
_future( _promise.get_future().share() ) {}
template< typename op_t >
void add_thread( op_t && operation ) {
_pool.emplace_back( del_thread(std::forward< op_t >( operation ), std::ref( _future ) ) );
}

void run_batch() {
_promise.set_value();
_pool.clear();
}
};

基本思想是使用 void-future 创建一个挂起的线程,执行线程设置,如设置亲和性和/或优先级,并同时启动所有线程。可以看到,当池被清除时,主线程应该加入添加的线程。为了测试线程池,我写了一个小主程序,看起来像这样:

#include <chrono>
#include <iostream>

#include "threads.h"

void runFunc() {
std::cout << "In runFunc...\n";
return;
}
void run2Func() {
std::cout << "In run2Func...\n";
return;
}

int main() {
batch_thread_pool tp;
tp.add_thread( runFunc );
tp.add_thread( run2Func );
std::cout << "Working while thread 'suspended'...\n";
tp.run_batch();
std::cout << "Working while thread runs asynchronously...\n";
std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
std::cout << "Done!\n";
}

不幸的是,线程没有始终如一地启动。有时两种方法(runFunc 和 run2Func)都被执行,有时只执行其中一个。我想是的,因为主线程在连接发生之前就结束了。这是正确的还是有人知道我做错了什么?

真诚的

最佳答案

execution_state _state;

如果要支持这些对象移动,这个不能自动存储。

老实说,我能想到的唯一用途就是ABORT。移出的thread 不可连接...

class del_thread {
private:
using future_t = std::shared_future< void >;
using thread_t = std::thread;

enum execution_state {
WAITING, TRIGGERED, ABORT
};

std::unique_ptr<std::atomic<execution_state>> _state =
std::make_unique<std::atomic<execution_state>>(WAITING);

future_t _future;
thread_t _thread;

public:
void abort() const {
if (_state) *_state = ABORT;
}
del_thread() = delete;
del_thread( del_thread const & ) = delete;
del_thread &operator=( del_thread const & dt ) = delete;

del_thread( del_thread && other ) = default;
del_thread &operator=( del_thread && dt ) = default;

template< class op_t >
del_thread( op_t && operation, future_t const & future ):
_thread(
[
operation = std::forward< op_t >( operation ),
_future = future,
_state = _state.get()
]()
{
_future.wait();
if (*_state == ABORT) return;
*_state = TRIGGERED;
operation();
}
)
{}

~del_thread() {
join();
}

void join() {
if( !_state ) {
return;
}
if( _thread.joinable() ) {
_thread.join();
}
}
};

如果你不想中止:

class del_thread {
private:
using future_t = std::shared_future< void >;
using thread_t = std::thread;

future_t _future;
thread_t _thread;

public:
del_thread() = delete;
del_thread( del_thread const & ) = delete;
del_thread &operator=( del_thread const & dt ) = delete;

del_thread( del_thread && other ) = default;
del_thread &operator=( del_thread && dt ) = default;

class del_thread {
private:
using future_t = std::shared_future< void >;
using thread_t = std::thread;

future_t _future;
thread_t _thread;

public:
del_thread() = delete;
del_thread( del_thread const & ) = delete;
del_thread &operator=( del_thread const & dt ) = delete;

del_thread( del_thread && other ) = default;
del_thread &operator=( del_thread && dt ) = default;

template< class op_t >
del_thread( op_t && operation, future_t const & future ):
_thread(
[
operation = std::forward< op_t >( operation ),
_future = future
]()
{
_future.wait();
operation();
}
)
{}

~del_thread() {
join();
}

void join() {
if( _thread.joinable() ) {
_thread.join();
}
}
};

Live example .

关于用于挂起线程的 C++ 线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46529441/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com